core: integrate instrumentation-java (Census) tracing (#2938)

Main implementation is in CensusTracingModule.

Also a few fix-ups in the stats implementation CensusStatsModule:

- Change header key name from grpc-census-bin to grpc-tags-bin
- Server does not fail on header parse errors. Uses the default instead.

Protect Census-based stats and tracing with static flags: `GrpcUtil.enableCensusStats` and `GrpcUtil.enableCensusTracing`. They keep those features disabled by default until they, especially their wire formats, are stabilized.
diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
index b257ffd..c97fffc 100644
--- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
@@ -39,6 +39,7 @@
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.instrumentation.stats.Stats;
 import com.google.instrumentation.stats.StatsContextFactory;
+import com.google.instrumentation.trace.Tracing;
 import io.grpc.Attributes;
 import io.grpc.ClientInterceptor;
 import io.grpc.CompressorRegistry;
@@ -287,19 +288,25 @@
       nameResolverFactory = NameResolverProvider.asFactory();
     }
 
-    List<ClientInterceptor> interceptors = this.interceptors;
-    if (recordsStats()) {
+    List<ClientInterceptor> effectiveInterceptors =
+        new ArrayList<ClientInterceptor>(this.interceptors);
+    if (GrpcUtil.enableCensusStats && recordsStats()) {
       StatsContextFactory statsCtxFactory =
           this.statsFactory != null ? this.statsFactory : Stats.getStatsContextFactory();
       if (statsCtxFactory != null) {
-        interceptors = new ArrayList<ClientInterceptor>(this.interceptors);
-        CensusStreamTracerModule census =
-            new CensusStreamTracerModule(statsCtxFactory, GrpcUtil.STOPWATCH_SUPPLIER);
+        CensusStatsModule censusStats =
+            new CensusStatsModule(statsCtxFactory, GrpcUtil.STOPWATCH_SUPPLIER);
         // First interceptor runs last (see ClientInterceptors.intercept()), so that no
         // other interceptor can override the tracer factory we set in CallOptions.
-        interceptors.add(0, census.getClientInterceptor());
+        effectiveInterceptors.add(0, censusStats.getClientInterceptor());
       }
     }
+    if (GrpcUtil.enableCensusTracing) {
+      CensusTracingModule censusTracing =
+          new CensusTracingModule(Tracing.getTracer(), Tracing.getBinaryPropagationHandler());
+      effectiveInterceptors.add(0, censusTracing.getClientInterceptor());
+    }
+
     return new ManagedChannelImpl(
         target,
         // TODO(carl-mastrangelo): Allow clients to pass this in
@@ -316,7 +323,7 @@
         GrpcUtil.STOPWATCH_SUPPLIER,
         idleTimeoutMillis,
         userAgent,
-        interceptors);
+        effectiveInterceptors);
   }
 
   /**
diff --git a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
index 649b6e5..fbeb147 100644
--- a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
@@ -38,6 +38,7 @@
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.instrumentation.stats.Stats;
 import com.google.instrumentation.stats.StatsContextFactory;
+import com.google.instrumentation.trace.Tracing;
 import io.grpc.BindableService;
 import io.grpc.CompressorRegistry;
 import io.grpc.Context;
@@ -172,14 +173,22 @@
   public ServerImpl build() {
     ArrayList<ServerStreamTracer.Factory> tracerFactories =
         new ArrayList<ServerStreamTracer.Factory>();
-    StatsContextFactory statsFactory =
-        this.statsFactory != null ? this.statsFactory : Stats.getStatsContextFactory();
-    if (statsFactory != null) {
-      CensusStreamTracerModule census =
-          new CensusStreamTracerModule(statsFactory, GrpcUtil.STOPWATCH_SUPPLIER);
-      tracerFactories.add(census.getServerTracerFactory());
+    if (GrpcUtil.enableCensusStats) {
+      StatsContextFactory statsFactory =
+          this.statsFactory != null ? this.statsFactory : Stats.getStatsContextFactory();
+      if (statsFactory != null) {
+        CensusStatsModule censusStats =
+            new CensusStatsModule(statsFactory, GrpcUtil.STOPWATCH_SUPPLIER);
+        tracerFactories.add(censusStats.getServerTracerFactory());
+      }
+    }
+    if (GrpcUtil.enableCensusTracing) {
+      CensusTracingModule censusTracing =
+          new CensusTracingModule(Tracing.getTracer(), Tracing.getBinaryPropagationHandler());
+      tracerFactories.add(censusTracing.getServerTracerFactory());
     }
     tracerFactories.addAll(streamTracerFactories);
+
     io.grpc.internal.InternalServer transportServer =
         buildTransportServer(Collections.unmodifiableList(tracerFactories));
     ServerImpl server = new ServerImpl(getExecutorPool(),
diff --git a/core/src/main/java/io/grpc/internal/CensusStreamTracerModule.java b/core/src/main/java/io/grpc/internal/CensusStatsModule.java
similarity index 89%
rename from core/src/main/java/io/grpc/internal/CensusStreamTracerModule.java
rename to core/src/main/java/io/grpc/internal/CensusStatsModule.java
index a91cec4..ac05e88 100644
--- a/core/src/main/java/io/grpc/internal/CensusStreamTracerModule.java
+++ b/core/src/main/java/io/grpc/internal/CensusStatsModule.java
@@ -63,19 +63,23 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 import javax.annotation.Nullable;
 
 /**
- * Provides factories for {@link StreamTracer} that records tracing and metrics to Census.
+ * Provides factories for {@link StreamTracer} that records stats to Census.
  *
  * <p>On the client-side, a factory is created for each call, because ClientCall starts earlier than
  * the ClientStream, and in some cases may even not create a ClientStream at all.  Therefore, it's
  * the factory that reports the summary to Census.
  *
- * <p>On the server-side, a tracer is created for each call, because ServerStream starts earlier
- * than the ServerCall.  Therefore, it's the tracer that reports the summary to Census.
+ * <p>On the server-side, there is only one ServerStream per each ServerCall, and ServerStream
+ * starts earlier than the ServerCall.  Therefore, only one tracer is created per stream/call and
+ * it's the tracer that reports the summary to Census.
  */
-final class CensusStreamTracerModule {
+final class CensusStatsModule {
+  private static final Logger logger = Logger.getLogger(CensusStatsModule.class.getName());
   private static final double NANOS_PER_MILLI = TimeUnit.MILLISECONDS.toNanos(1);
   private static final ClientTracer BLANK_CLIENT_TRACER = new ClientTracer();
 
@@ -86,16 +90,17 @@
 
   private final StatsContextFactory statsCtxFactory;
   private final Supplier<Stopwatch> stopwatchSupplier;
-  private final Metadata.Key<StatsContext> statsHeader;
-  private final CensusClientInterceptor clientInterceptor = new CensusClientInterceptor();
+  @VisibleForTesting
+  final Metadata.Key<StatsContext> statsHeader;
+  private final StatsClientInterceptor clientInterceptor = new StatsClientInterceptor();
   private final ServerTracerFactory serverTracerFactory = new ServerTracerFactory();
 
-  CensusStreamTracerModule(
+  CensusStatsModule(
       final StatsContextFactory statsCtxFactory, Supplier<Stopwatch> stopwatchSupplier) {
     this.statsCtxFactory = checkNotNull(statsCtxFactory, "statsCtxFactory");
     this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
     this.statsHeader =
-        Metadata.Key.of("grpc-census-bin", new Metadata.BinaryMarshaller<StatsContext>() {
+        Metadata.Key.of("grpc-tags-bin", new Metadata.BinaryMarshaller<StatsContext>() {
             @Override
             public byte[] toBytes(StatsContext context) {
               // TODO(carl-mastrangelo): currently we only make sure the correctness. We may need to
@@ -113,8 +118,9 @@
             public StatsContext parseBytes(byte[] serialized) {
               try {
                 return statsCtxFactory.deserialize(new ByteArrayInputStream(serialized));
-              } catch (IOException e) {
-                throw new RuntimeException(e);
+              } catch (Exception e) {
+                logger.log(Level.FINE, "Failed to parse stats header", e);
+                return statsCtxFactory.getDefault();
               }
             }
           });
@@ -192,7 +198,9 @@
       checkState(streamTracer.compareAndSet(null, tracer),
           "Are you creating multiple streams per call? This class doesn't yet support this case.");
       headers.discardAll(statsHeader);
-      headers.put(statsHeader, parentCtx);
+      if (parentCtx != statsCtxFactory.getDefault()) {
+        headers.put(statsHeader, parentCtx);
+      }
       return tracer;
     }
 
@@ -245,10 +253,10 @@
     private final AtomicLong outboundUncompressedSize = new AtomicLong();
     private final AtomicLong inboundUncompressedSize = new AtomicLong();
 
-    ServerTracer(String fullMethodName, @Nullable StatsContext parentCtx) {
+    ServerTracer(String fullMethodName, StatsContext parentCtx) {
       this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
+      this.parentCtx = checkNotNull(parentCtx, "parentCtx");
       this.stopwatch = stopwatchSupplier.get().start();
-      this.parentCtx = parentCtx;
     }
 
     @Override
@@ -308,11 +316,10 @@
 
     @Override
     public <ReqT, RespT> Context filterContext(Context context) {
-      if (parentCtx != null) {
+      if (parentCtx != statsCtxFactory.getDefault()) {
         return context.withValue(STATS_CONTEXT_KEY, parentCtx);
-      } else {
-        return context;
       }
+      return context;
     }
   }
 
@@ -320,14 +327,18 @@
     @Override
     public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
       StatsContext parentCtx = headers.get(statsHeader);
+      if (parentCtx == null) {
+        parentCtx = statsCtxFactory.getDefault();
+      }
       return new ServerTracer(fullMethodName, parentCtx);
     }
   }
 
-  private class CensusClientInterceptor implements ClientInterceptor {
+  private class StatsClientInterceptor implements ClientInterceptor {
     @Override
     public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
         MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
+      // New RPCs on client-side inherit the stats context from the current Context.
       StatsContext parentCtx = STATS_CONTEXT_KEY.get();
       if (parentCtx == null) {
         parentCtx = statsCtxFactory.getDefault();
diff --git a/core/src/main/java/io/grpc/internal/CensusTracingModule.java b/core/src/main/java/io/grpc/internal/CensusTracingModule.java
new file mode 100644
index 0000000..4668033
--- /dev/null
+++ b/core/src/main/java/io/grpc/internal/CensusTracingModule.java
@@ -0,0 +1,307 @@
+/*
+ * Copyright 2016, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *    * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *    * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ *    * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc.internal;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.instrumentation.trace.ContextUtils.CONTEXT_SPAN_KEY;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.instrumentation.trace.BinaryPropagationHandler;
+import com.google.instrumentation.trace.EndSpanOptions;
+import com.google.instrumentation.trace.Span;
+import com.google.instrumentation.trace.SpanContext;
+import com.google.instrumentation.trace.Status;
+import com.google.instrumentation.trace.Tracer;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ClientStreamTracer;
+import io.grpc.Context;
+import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
+import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.ServerStreamTracer;
+import io.grpc.StreamTracer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.annotation.Nullable;
+
+/**
+ * Provides factories for {@link StreamTracer} that records traces to Census.
+ *
+ * <p>On the client-side, a factory is created for each call, because ClientCall starts earlier than
+ * the ClientStream, and in some cases may even not create a ClientStream at all.  Therefore, it's
+ * the factory that reports the summary to Census.
+ *
+ * <p>On the server-side, there is only one ServerStream per each ServerCall, and ServerStream
+ * starts earlier than the ServerCall.  Therefore, only one tracer is created per stream/call and
+ * it's the tracer that reports the summary to Census.
+ */
+final class CensusTracingModule {
+  private static final Logger logger = Logger.getLogger(CensusTracingModule.class.getName());
+  // TODO(zhangkun83): record NetworkEvent to Span for each message
+  private static final ClientStreamTracer noopClientTracer = new ClientStreamTracer() {};
+
+  private final Tracer censusTracer;
+  private final BinaryPropagationHandler censusTracingPropagationHandler;
+  @VisibleForTesting
+  final Metadata.Key<SpanContext> tracingHeader;
+  private final TracingClientInterceptor clientInterceptor = new TracingClientInterceptor();
+  private final ServerTracerFactory serverTracerFactory = new ServerTracerFactory();
+
+  CensusTracingModule(
+      Tracer censusTracer, final BinaryPropagationHandler censusTracingPropagationHandler) {
+    this.censusTracer = checkNotNull(censusTracer, "censusTracer");
+    this.censusTracingPropagationHandler =
+        checkNotNull(censusTracingPropagationHandler, "censusTracingPropagationHandler");
+    this.tracingHeader =
+        Metadata.Key.of("grpc-trace-bin", new Metadata.BinaryMarshaller<SpanContext>() {
+            @Override
+            public byte[] toBytes(SpanContext context) {
+              return censusTracingPropagationHandler.toBinaryValue(context);
+            }
+
+            @Override
+            public SpanContext parseBytes(byte[] serialized) {
+              try {
+                return censusTracingPropagationHandler.fromBinaryValue(serialized);
+              } catch (Exception e) {
+                logger.log(Level.FINE, "Failed to parse tracing header", e);
+                return SpanContext.INVALID;
+              }
+            }
+          });
+  }
+
+  /**
+   * Creates a {@link ClientCallTracer} for a new call.
+   */
+  @VisibleForTesting
+  ClientCallTracer newClientCallTracer(@Nullable Span parentSpan, String fullMethodName) {
+    return new ClientCallTracer(parentSpan, fullMethodName);
+  }
+
+  /**
+   * Returns the server tracer factory.
+   */
+  ServerStreamTracer.Factory getServerTracerFactory() {
+    return serverTracerFactory;
+  }
+
+  /**
+   * Returns the client interceptor that facilitates Census-based stats reporting.
+   */
+  ClientInterceptor getClientInterceptor() {
+    return clientInterceptor;
+  }
+
+  private static String makeSpanName(String prefix, String fullMethodName) {
+    return prefix + "." + fullMethodName.replace('/', '.');
+  }
+
+  @VisibleForTesting
+  static Status convertStatus(io.grpc.Status grpcStatus) {
+    Status status;
+    switch (grpcStatus.getCode()) {
+      case OK:
+        status = Status.OK;
+        break;
+      case CANCELLED:
+        status = Status.CANCELLED;
+        break;
+      case UNKNOWN:
+        status = Status.UNKNOWN;
+        break;
+      case INVALID_ARGUMENT:
+        status = Status.INVALID_ARGUMENT;
+        break;
+      case DEADLINE_EXCEEDED:
+        status = Status.DEADLINE_EXCEEDED;
+        break;
+      case NOT_FOUND:
+        status = Status.NOT_FOUND;
+        break;
+      case ALREADY_EXISTS:
+        status = Status.ALREADY_EXISTS;
+        break;
+      case PERMISSION_DENIED:
+        status = Status.PERMISSION_DENIED;
+        break;
+      case RESOURCE_EXHAUSTED:
+        status = Status.RESOURCE_EXHAUSTED;
+        break;
+      case FAILED_PRECONDITION:
+        status = Status.FAILED_PRECONDITION;
+        break;
+      case ABORTED:
+        status = Status.ABORTED;
+        break;
+      case OUT_OF_RANGE:
+        status = Status.OUT_OF_RANGE;
+        break;
+      case UNIMPLEMENTED:
+        status = Status.UNIMPLEMENTED;
+        break;
+      case INTERNAL:
+        status = Status.INTERNAL;
+        break;
+      case UNAVAILABLE:
+        status = Status.UNAVAILABLE;
+        break;
+      case DATA_LOSS:
+        status = Status.DATA_LOSS;
+        break;
+      case UNAUTHENTICATED:
+        status = Status.UNAUTHENTICATED;
+        break;
+      default:
+        throw new AssertionError("Unhandled status code " + grpcStatus.getCode());
+    }
+    if (grpcStatus.getDescription() != null) {
+      status = status.withDescription(grpcStatus.getDescription());
+    }
+    return status;
+  }
+
+  private static EndSpanOptions createEndSpanOptions(io.grpc.Status status) {
+    return EndSpanOptions.builder().setStatus(convertStatus(status)).build();
+  }
+
+  @VisibleForTesting
+  final class ClientCallTracer extends ClientStreamTracer.Factory {
+
+    private final String fullMethodName;
+    private final AtomicBoolean callEnded = new AtomicBoolean(false);
+    private final Span span;
+
+    ClientCallTracer(@Nullable Span parentSpan, String fullMethodName) {
+      this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
+      this.span =
+          censusTracer.spanBuilder(parentSpan, makeSpanName("Sent", fullMethodName)).startSpan();
+    }
+
+    @Override
+    public ClientStreamTracer newClientStreamTracer(Metadata headers) {
+      headers.discardAll(tracingHeader);
+      headers.put(tracingHeader, span.getContext());
+      return noopClientTracer;
+    }
+
+    /**
+     * Record a finished call and mark the current time as the end time.
+     *
+     * <p>Can be called from any thread without synchronization.  Calling it the second time or more
+     * is a no-op.
+     */
+    void callEnded(io.grpc.Status status) {
+      if (!callEnded.compareAndSet(false, true)) {
+        return;
+      }
+      span.end(createEndSpanOptions(status));
+    }
+  }
+
+  private final class ServerTracer extends ServerStreamTracer {
+    private final String fullMethodName;
+    private final Span span;
+    private final AtomicBoolean streamClosed = new AtomicBoolean(false);
+
+    ServerTracer(String fullMethodName, @Nullable SpanContext remoteSpan) {
+      this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
+      this.span =
+          censusTracer.spanBuilderWithRemoteParent(
+              remoteSpan, makeSpanName("Recv", fullMethodName))
+          .startSpan();
+    }
+
+    /**
+     * Record a finished stream and mark the current time as the end time.
+     *
+     * <p>Can be called from any thread without synchronization.  Calling it the second time or more
+     * is a no-op.
+     */
+    @Override
+    public void streamClosed(io.grpc.Status status) {
+      if (!streamClosed.compareAndSet(false, true)) {
+        return;
+      }
+      span.end(createEndSpanOptions(status));
+    }
+
+    @Override
+    public <ReqT, RespT> Context filterContext(Context context) {
+      return context.withValue(CONTEXT_SPAN_KEY, span);
+    }
+  }
+
+  private final class ServerTracerFactory extends ServerStreamTracer.Factory {
+    @SuppressWarnings("ReferenceEquality")
+    @Override
+    public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
+      SpanContext remoteSpan = headers.get(tracingHeader);
+      if (remoteSpan == SpanContext.INVALID) {
+        remoteSpan = null;
+      }
+      return new ServerTracer(fullMethodName, remoteSpan);
+    }
+  }
+
+  private class TracingClientInterceptor implements ClientInterceptor {
+    @Override
+    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
+        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
+      // New RPCs on client-side inherit the tracing context from the current Context.
+      Span parentSpan = CONTEXT_SPAN_KEY.get();
+      final ClientCallTracer tracerFactory =
+          newClientCallTracer(parentSpan, method.getFullMethodName());
+      ClientCall<ReqT, RespT> call =
+          next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
+      return new SimpleForwardingClientCall<ReqT, RespT>(call) {
+        @Override
+        public void start(Listener<RespT> responseListener, Metadata headers) {
+          delegate().start(
+              new SimpleForwardingClientCallListener<RespT>(responseListener) {
+                @Override
+                public void onClose(io.grpc.Status status, Metadata trailers) {
+                  tracerFactory.callEnded(status);
+                  super.onClose(status, trailers);
+                }
+              },
+              headers);
+        }
+      };
+    }
+  }
+}
diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java
index 0ffd05b..4da7be6 100644
--- a/core/src/main/java/io/grpc/internal/GrpcUtil.java
+++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java
@@ -203,6 +203,20 @@
   public static final long SERVER_KEEPALIVE_TIME_NANOS_DISABLED = Long.MAX_VALUE;
 
   /**
+   * Whether the channel builder and server builder will try to load and use Census stats library.
+   * Delete this and assume always on once Census stats has been fully tested and its wire-format is
+   * stabilized.
+   */
+  public static boolean enableCensusStats;
+
+  /**
+   * Whether the channel builder and server builder will try to load and use Census tracing library.
+   * Delete this and assume always on once Census stats has been fully tested and its wire-format is
+   * stabilized.
+   */
+  public static boolean enableCensusTracing;
+
+  /**
    * Maps HTTP error response status codes to transport codes, as defined in <a
    * href="https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md">
    * http-grpc-status-mapping.md</a>. Never returns a status for which {@code status.isOk()} is
diff --git a/core/src/test/java/io/grpc/internal/CensusModulesTest.java b/core/src/test/java/io/grpc/internal/CensusModulesTest.java
new file mode 100644
index 0000000..23e3a00
--- /dev/null
+++ b/core/src/test/java/io/grpc/internal/CensusModulesTest.java
@@ -0,0 +1,728 @@
+/*
+ * Copyright 2017, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *    * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *    * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ *    * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc.internal;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.instrumentation.stats.RpcConstants;
+import com.google.instrumentation.stats.StatsContext;
+import com.google.instrumentation.stats.TagValue;
+import com.google.instrumentation.trace.Annotation;
+import com.google.instrumentation.trace.AttributeValue;
+import com.google.instrumentation.trace.BinaryPropagationHandler;
+import com.google.instrumentation.trace.ContextUtils;
+import com.google.instrumentation.trace.EndSpanOptions;
+import com.google.instrumentation.trace.Link;
+import com.google.instrumentation.trace.NetworkEvent;
+import com.google.instrumentation.trace.Span;
+import com.google.instrumentation.trace.SpanContext;
+import com.google.instrumentation.trace.SpanFactory;
+import com.google.instrumentation.trace.SpanId;
+import com.google.instrumentation.trace.StartSpanOptions;
+import com.google.instrumentation.trace.TraceId;
+import com.google.instrumentation.trace.TraceOptions;
+import com.google.instrumentation.trace.Tracer;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ClientInterceptors;
+import io.grpc.ClientStreamTracer;
+import io.grpc.Context;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.ServerCall;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerServiceDefinition;
+import io.grpc.ServerStreamTracer;
+import io.grpc.Status;
+import io.grpc.internal.testing.StatsTestUtils;
+import io.grpc.internal.testing.StatsTestUtils.FakeStatsContextFactory;
+import io.grpc.testing.GrpcServerRule;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.text.ParseException;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Test for {@link CensusStatsModule} and {@link CensusTracingModule}.
+ */
+@RunWith(JUnit4.class)
+public class CensusModulesTest {
+  private static final CallOptions.Key<String> CUSTOM_OPTION =
+      CallOptions.Key.of("option1", "default");
+  private static final CallOptions CALL_OPTIONS =
+      CallOptions.DEFAULT.withOption(CUSTOM_OPTION, "customvalue");
+
+  private static class StringInputStream extends InputStream {
+    final String string;
+
+    StringInputStream(String string) {
+      this.string = string;
+    }
+
+    @Override
+    public int read() {
+      // InProcessTransport doesn't actually read bytes from the InputStream.  The InputStream is
+      // passed to the InProcess server and consumed by MARSHALLER.parse().
+      throw new UnsupportedOperationException("Should not be called");
+    }
+  }
+
+  private static final MethodDescriptor.Marshaller<String> MARSHALLER =
+      new MethodDescriptor.Marshaller<String>() {
+        @Override
+        public InputStream stream(String value) {
+          return new StringInputStream(value);
+        }
+
+        @Override
+        public String parse(InputStream stream) {
+          return ((StringInputStream) stream).string;
+        }
+      };
+
+  private final MethodDescriptor<String, String> method = MethodDescriptor.create(
+      MethodDescriptor.MethodType.UNKNOWN, "package1.service2/method3",
+      MARSHALLER, MARSHALLER);
+  private final FakeClock fakeClock = new FakeClock();
+  private final FakeStatsContextFactory statsCtxFactory = new FakeStatsContextFactory();
+  private final Random random = new Random(0);
+  private final SpanContext fakeClientSpanContext =
+      SpanContext.create(
+          TraceId.generateRandomId(random), SpanId.generateRandomId(random),
+          TraceOptions.builder().build());
+  private final SpanContext fakeClientParentSpanContext =
+      SpanContext.create(
+          TraceId.generateRandomId(random), SpanId.generateRandomId(random),
+          TraceOptions.builder().build());
+  private final SpanContext fakeServerSpanContext =
+      SpanContext.create(
+          TraceId.generateRandomId(random), SpanId.generateRandomId(random),
+          TraceOptions.builder().build());
+  private final SpanContext fakeServerParentSpanContext =
+      SpanContext.create(
+          TraceId.generateRandomId(random), SpanId.generateRandomId(random),
+          TraceOptions.builder().build());
+  private final Span fakeClientSpan = new FakeSpan(fakeClientSpanContext);
+  private final Span fakeServerSpan = new FakeSpan(fakeServerSpanContext);
+  private final Span fakeClientParentSpan = new FakeSpan(fakeClientParentSpanContext);
+  private final Span fakeServerParentSpan = new FakeSpan(fakeServerParentSpanContext);
+  private final Span spyClientSpan = spy(fakeClientSpan);
+  private final Span spyServerSpan = spy(fakeServerSpan);
+  private final byte[] binarySpanContext = new byte[]{3, 1, 5};
+
+  @Rule
+  public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();
+
+  @Mock
+  private AccessibleSpanFactory mockSpanFactory;
+  @Mock
+  private BinaryPropagationHandler mockTracingPropagationHandler;
+  @Mock
+  private ClientCall.Listener<String> mockClientCallListener;
+  @Mock
+  private ServerCall.Listener<String> mockServerCallListener;
+  @Captor
+  private ArgumentCaptor<CallOptions> callOptionsCaptor;
+  @Captor
+  private ArgumentCaptor<ClientCall.Listener<String>> clientCallListenerCaptor;
+  @Captor
+  private ArgumentCaptor<Status> statusCaptor;
+
+  private Tracer tracer;
+  private CensusStatsModule censusStats;
+  private CensusTracingModule censusTracing;
+
+  @Before
+  @SuppressWarnings("unchecked")
+  public void setUp() throws Exception {
+    MockitoAnnotations.initMocks(this);
+    when(mockSpanFactory.startSpan(any(Span.class), anyString(), any(StartSpanOptions.class)))
+        .thenReturn(spyClientSpan);
+    when(
+        mockSpanFactory.startSpanWithRemoteParent(
+            any(SpanContext.class), anyString(), any(StartSpanOptions.class)))
+        .thenReturn(spyServerSpan);
+    when(mockTracingPropagationHandler.toBinaryValue(any(SpanContext.class)))
+        .thenReturn(binarySpanContext);
+    when(mockTracingPropagationHandler.fromBinaryValue(any(byte[].class)))
+        .thenReturn(fakeServerParentSpanContext);
+    tracer = new Tracer(mockSpanFactory) {};
+    censusStats = new CensusStatsModule(statsCtxFactory, fakeClock.getStopwatchSupplier());
+    censusTracing = new CensusTracingModule(tracer, mockTracingPropagationHandler);
+  }
+
+  @After
+  public void wrapUp() {
+    assertNull(statsCtxFactory.pollRecord());
+  }
+
+  @Test
+  public void clientInterceptorNoCustomTag() {
+    testClientInterceptors(false);
+  }
+
+  @Test
+  public void clientInterceptorCustomTag() {
+    testClientInterceptors(true);
+  }
+
+  // Test that Census ClientInterceptors uses the StatsContext and Span out of the current Context
+  // to create the ClientCallTracer, and that it intercepts ClientCall.Listener.onClose() to call
+  // ClientCallTracer.callEnded().
+  private void testClientInterceptors(boolean nonDefaultContext) {
+    grpcServerRule.getServiceRegistry().addService(
+        ServerServiceDefinition.builder("package1.service2").addMethod(
+            method, new ServerCallHandler<String, String>() {
+                @Override
+                public ServerCall.Listener<String> startCall(
+                    ServerCall<String, String> call, Metadata headers) {
+                  call.sendHeaders(new Metadata());
+                  call.sendMessage("Hello");
+                  call.close(
+                      Status.PERMISSION_DENIED.withDescription("No you don't"), new Metadata());
+                  return mockServerCallListener;
+                }
+              }).build());
+
+    final AtomicReference<CallOptions> capturedCallOptions = new AtomicReference<CallOptions>();
+    ClientInterceptor callOptionsCaptureInterceptor = new ClientInterceptor() {
+        @Override
+        public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
+            MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
+          capturedCallOptions.set(callOptions);
+          return next.newCall(method, callOptions);
+        }
+      };
+    Channel interceptedChannel =
+        ClientInterceptors.intercept(
+            grpcServerRule.getChannel(), callOptionsCaptureInterceptor,
+            censusStats.getClientInterceptor(), censusTracing.getClientInterceptor());
+    ClientCall<String, String> call;
+    if (nonDefaultContext) {
+      Context ctx =
+          Context.ROOT.withValues(
+              CensusStatsModule.STATS_CONTEXT_KEY,
+              statsCtxFactory.getDefault().with(
+                  StatsTestUtils.EXTRA_TAG, TagValue.create("extra value")),
+              ContextUtils.CONTEXT_SPAN_KEY,
+              fakeClientParentSpan);
+      Context origCtx = ctx.attach();
+      try {
+        call = interceptedChannel.newCall(method, CALL_OPTIONS);
+      } finally {
+        ctx.detach(origCtx);
+      }
+    } else {
+      assertNull(CensusStatsModule.STATS_CONTEXT_KEY.get());
+      assertNull(ContextUtils.CONTEXT_SPAN_KEY.get());
+      call = interceptedChannel.newCall(method, CALL_OPTIONS);
+    }
+
+    // The interceptor adds tracer factory to CallOptions
+    assertEquals("customvalue", capturedCallOptions.get().getOption(CUSTOM_OPTION));
+    assertEquals(2, capturedCallOptions.get().getStreamTracerFactories().size());
+    assertTrue(
+        capturedCallOptions.get().getStreamTracerFactories().get(0)
+        instanceof CensusTracingModule.ClientCallTracer);
+    assertTrue(
+        capturedCallOptions.get().getStreamTracerFactories().get(1)
+        instanceof CensusStatsModule.ClientCallTracer);
+
+    // Make the call
+    Metadata headers = new Metadata();
+    call.start(mockClientCallListener, headers);
+    assertNull(statsCtxFactory.pollRecord());
+    if (nonDefaultContext) {
+      verify(mockSpanFactory).startSpan(
+          same(fakeClientParentSpan), eq("Sent.package1.service2.method3"),
+          any(StartSpanOptions.class));
+    } else {
+      verify(mockSpanFactory).startSpan(
+          isNull(Span.class), eq("Sent.package1.service2.method3"), any(StartSpanOptions.class));
+    }
+    verify(spyClientSpan, never()).end(any(EndSpanOptions.class));
+
+    // End the call
+    call.halfClose();
+    call.request(1);
+
+    verify(mockClientCallListener).onClose(statusCaptor.capture(), any(Metadata.class));
+    Status status = statusCaptor.getValue();
+    assertEquals(Status.Code.PERMISSION_DENIED, status.getCode());
+    assertEquals("No you don't", status.getDescription());
+
+    // The intercepting listener calls callEnded() on ClientCallTracer, which records to Census.
+    StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord();
+    assertNotNull(record);
+    TagValue methodTag = record.tags.get(RpcConstants.RPC_CLIENT_METHOD);
+    assertEquals(method.getFullMethodName(), methodTag.toString());
+    TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS);
+    assertEquals(Status.Code.PERMISSION_DENIED.toString(), statusTag.toString());
+    if (nonDefaultContext) {
+      TagValue extraTag = record.tags.get(StatsTestUtils.EXTRA_TAG);
+      assertEquals("extra value", extraTag.toString());
+    } else {
+      assertNull(record.tags.get(StatsTestUtils.EXTRA_TAG));
+    }
+    verify(spyClientSpan).end(
+        EndSpanOptions.builder()
+            .setStatus(
+                com.google.instrumentation.trace.Status.PERMISSION_DENIED
+                    .withDescription("No you don't"))
+            .build());
+    verify(spyClientSpan, never()).end();
+  }
+
+  @Test
+  public void clientBasicStatsDefaultContext() {
+    CensusStatsModule.ClientCallTracer callTracer =
+        censusStats.newClientCallTracer(statsCtxFactory.getDefault(), method.getFullMethodName());
+    Metadata headers = new Metadata();
+    ClientStreamTracer tracer = callTracer.newClientStreamTracer(headers);
+
+    fakeClock.forwardTime(30, MILLISECONDS);
+    tracer.outboundHeaders();
+
+    fakeClock.forwardTime(100, MILLISECONDS);
+    tracer.outboundWireSize(1028);
+    tracer.outboundUncompressedSize(1128);
+
+    fakeClock.forwardTime(16, MILLISECONDS);
+    tracer.inboundWireSize(33);
+    tracer.inboundUncompressedSize(67);
+    tracer.outboundWireSize(99);
+    tracer.outboundUncompressedSize(865);
+
+    fakeClock.forwardTime(24, MILLISECONDS);
+    tracer.inboundWireSize(154);
+    tracer.inboundUncompressedSize(552);
+    tracer.streamClosed(Status.OK);
+    callTracer.callEnded(Status.OK);
+
+    StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord();
+    assertNotNull(record);
+    assertNoServerContent(record);
+    TagValue methodTag = record.tags.get(RpcConstants.RPC_CLIENT_METHOD);
+    assertEquals(method.getFullMethodName(), methodTag.toString());
+    TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS);
+    assertEquals(Status.Code.OK.toString(), statusTag.toString());
+    assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ERROR_COUNT));
+    assertEquals(1028 + 99, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_BYTES));
+    assertEquals(1128 + 865,
+        record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
+    assertEquals(33 + 154, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_BYTES));
+    assertEquals(67 + 552,
+        record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
+    assertEquals(30 + 100 + 16 + 24,
+        record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
+  }
+
+  @Test
+  public void clientBasicTracingDefaultSpan() {
+    CensusTracingModule.ClientCallTracer callTracer =
+        censusTracing.newClientCallTracer(null, method.getFullMethodName());
+    Metadata headers = new Metadata();
+    ClientStreamTracer tracer = callTracer.newClientStreamTracer(headers);
+    verify(mockSpanFactory).startSpan(
+        isNull(Span.class), eq("Sent.package1.service2.method3"), any(StartSpanOptions.class));
+    verify(spyClientSpan, never()).end(any(EndSpanOptions.class));
+
+    tracer.streamClosed(Status.OK);
+    callTracer.callEnded(Status.OK);
+
+    verify(spyClientSpan).end(
+        EndSpanOptions.builder().setStatus(com.google.instrumentation.trace.Status.OK).build());
+    verifyNoMoreInteractions(mockSpanFactory);
+  }
+
+  @Test
+  public void clientStreamNeverCreatedStillRecordStats() {
+    CensusStatsModule.ClientCallTracer callTracer =
+        censusStats.newClientCallTracer(
+            statsCtxFactory.getDefault(), method.getFullMethodName());
+
+    fakeClock.forwardTime(3000, MILLISECONDS);
+    callTracer.callEnded(Status.DEADLINE_EXCEEDED.withDescription("3 seconds"));
+
+    StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord();
+    assertNotNull(record);
+    assertNoServerContent(record);
+    TagValue methodTag = record.tags.get(RpcConstants.RPC_CLIENT_METHOD);
+    assertEquals(method.getFullMethodName(), methodTag.toString());
+    TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS);
+    assertEquals(Status.Code.DEADLINE_EXCEEDED.toString(), statusTag.toString());
+    assertEquals(1, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ERROR_COUNT));
+    assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_BYTES));
+    assertEquals(0,
+        record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
+    assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_BYTES));
+    assertEquals(0,
+        record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
+    assertEquals(3000, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
+    assertNull(record.getMetric(RpcConstants.RPC_CLIENT_SERVER_ELAPSED_TIME));
+  }
+
+  @Test
+  public void clientStreamNeverCreatedStillRecordTracing() {
+    CensusTracingModule.ClientCallTracer callTracer =
+        censusTracing.newClientCallTracer(fakeClientParentSpan, method.getFullMethodName());
+    verify(mockSpanFactory).startSpan(
+        same(fakeClientParentSpan), eq("Sent.package1.service2.method3"),
+        any(StartSpanOptions.class));
+
+    callTracer.callEnded(Status.DEADLINE_EXCEEDED.withDescription("3 seconds"));
+    verify(spyClientSpan).end(
+        EndSpanOptions.builder()
+            .setStatus(
+                com.google.instrumentation.trace.Status.DEADLINE_EXCEEDED
+                    .withDescription("3 seconds"))
+            .build());
+    verify(spyClientSpan, never()).end();
+  }
+
+  @Test
+  public void statsHeadersPropagateTags() {
+    // EXTRA_TAG is propagated by the FakeStatsContextFactory. Note that not all tags are
+    // propagated.  The StatsContextFactory decides which tags are to propagated.  gRPC facilitates
+    // the propagation by putting them in the headers.
+    StatsContext clientCtx = statsCtxFactory.getDefault().with(
+        StatsTestUtils.EXTRA_TAG, TagValue.create("extra-tag-value-897"));
+    CensusStatsModule.ClientCallTracer callTracer =
+        censusStats.newClientCallTracer(clientCtx, method.getFullMethodName());
+    Metadata headers = new Metadata();
+    // This propagates clientCtx to headers
+    callTracer.newClientStreamTracer(headers);
+    assertTrue(headers.containsKey(censusStats.statsHeader));
+
+    ServerStreamTracer serverTracer =
+        censusStats.getServerTracerFactory().newServerStreamTracer(
+            method.getFullMethodName(), headers);
+    // Server tracer deserializes clientCtx from the headers, so that it records stats with the
+    // propagated tags.
+    Context serverContext = serverTracer.filterContext(Context.ROOT);
+    // It also put clientCtx in the Context seen by the call handler
+    assertEquals(clientCtx, CensusStatsModule.STATS_CONTEXT_KEY.get(serverContext));
+
+
+    // Verifies that the server tracer records the status with the propagated tag
+    serverTracer.streamClosed(Status.OK);
+
+    StatsTestUtils.MetricsRecord serverRecord = statsCtxFactory.pollRecord();
+    assertNotNull(serverRecord);
+    assertNoClientContent(serverRecord);
+    TagValue serverMethodTag = serverRecord.tags.get(RpcConstants.RPC_SERVER_METHOD);
+    assertEquals(method.getFullMethodName(), serverMethodTag.toString());
+    TagValue serverStatusTag = serverRecord.tags.get(RpcConstants.RPC_STATUS);
+    assertEquals(Status.Code.OK.toString(), serverStatusTag.toString());
+    assertNull(serverRecord.getMetric(RpcConstants.RPC_SERVER_ERROR_COUNT));
+    TagValue serverPropagatedTag = serverRecord.tags.get(StatsTestUtils.EXTRA_TAG);
+    assertEquals("extra-tag-value-897", serverPropagatedTag.toString());
+
+    // Verifies that the client tracer factory uses clientCtx, which includes the custom tags, to
+    // record stats.
+    callTracer.callEnded(Status.OK);
+
+    StatsTestUtils.MetricsRecord clientRecord = statsCtxFactory.pollRecord();
+    assertNotNull(clientRecord);
+    assertNoServerContent(clientRecord);
+    TagValue clientMethodTag = clientRecord.tags.get(RpcConstants.RPC_CLIENT_METHOD);
+    assertEquals(method.getFullMethodName(), clientMethodTag.toString());
+    TagValue clientStatusTag = clientRecord.tags.get(RpcConstants.RPC_STATUS);
+    assertEquals(Status.Code.OK.toString(), clientStatusTag.toString());
+    assertNull(clientRecord.getMetric(RpcConstants.RPC_CLIENT_ERROR_COUNT));
+    TagValue clientPropagatedTag = clientRecord.tags.get(StatsTestUtils.EXTRA_TAG);
+    assertEquals("extra-tag-value-897", clientPropagatedTag.toString());
+  }
+
+  @Test
+  public void statsHeadersNotPropagateDefaultContext() {
+    CensusStatsModule.ClientCallTracer callTracer =
+        censusStats.newClientCallTracer(statsCtxFactory.getDefault(), method.getFullMethodName());
+    Metadata headers = new Metadata();
+    callTracer.newClientStreamTracer(headers);
+    assertFalse(headers.containsKey(censusStats.statsHeader));
+  }
+
+  @Test
+  public void statsHeaderMalformed() {
+    // Construct a malformed header and make sure parsing it will throw
+    byte[] statsHeaderValue = new byte[]{1};
+    Metadata.Key<byte[]> arbitraryStatsHeader =
+        Metadata.Key.of("grpc-tags-bin", Metadata.BINARY_BYTE_MARSHALLER);
+    try {
+      statsCtxFactory.deserialize(new ByteArrayInputStream(statsHeaderValue));
+      fail("Should have thrown");
+    } catch (Exception e) {
+      // Expected
+    }
+
+    // But the header key will return a default context for it
+    Metadata headers = new Metadata();
+    assertNull(headers.get(censusStats.statsHeader));
+    headers.put(arbitraryStatsHeader, statsHeaderValue);
+    assertSame(statsCtxFactory.getDefault(), headers.get(censusStats.statsHeader));
+  }
+
+  @Test
+  public void traceHeadersPropagateSpanContext() throws Exception {
+    CensusTracingModule.ClientCallTracer callTracer =
+        censusTracing.newClientCallTracer(fakeClientParentSpan, method.getFullMethodName());
+    Metadata headers = new Metadata();
+    callTracer.newClientStreamTracer(headers);
+
+    verify(mockTracingPropagationHandler).toBinaryValue(same(fakeClientSpanContext));
+    verifyNoMoreInteractions(mockTracingPropagationHandler);
+    verify(mockSpanFactory).startSpan(
+        same(fakeClientParentSpan), eq("Sent.package1.service2.method3"),
+        any(StartSpanOptions.class));
+    verifyNoMoreInteractions(mockSpanFactory);
+    assertTrue(headers.containsKey(censusTracing.tracingHeader));
+
+    ServerStreamTracer serverTracer =
+        censusTracing.getServerTracerFactory().newServerStreamTracer(
+            method.getFullMethodName(), headers);
+    verify(mockTracingPropagationHandler).fromBinaryValue(same(binarySpanContext));
+    verify(mockSpanFactory).startSpanWithRemoteParent(
+        same(fakeServerParentSpanContext), eq("Recv.package1.service2.method3"),
+        any(StartSpanOptions.class));
+
+    Context filteredContext = serverTracer.filterContext(Context.ROOT);
+    assertSame(spyServerSpan, ContextUtils.CONTEXT_SPAN_KEY.get(filteredContext));
+  }
+
+  @Test
+  public void traceHeaderMalformed() throws Exception {
+    // As comparison, normal header parsing
+    Metadata headers = new Metadata();
+    headers.put(censusTracing.tracingHeader, fakeClientSpanContext);
+    // mockTracingPropagationHandler was stubbed to always return fakeServerParentSpanContext
+    assertSame(fakeServerParentSpanContext, headers.get(censusTracing.tracingHeader));
+
+    // Make BinaryPropagationHandler always throw when parsing the header
+    when(mockTracingPropagationHandler.fromBinaryValue(any(byte[].class)))
+        .thenThrow(new ParseException("Malformed header", 0));
+
+    headers = new Metadata();
+    assertNull(headers.get(censusTracing.tracingHeader));
+    headers.put(censusTracing.tracingHeader, fakeClientSpanContext);
+    assertSame(SpanContext.INVALID, headers.get(censusTracing.tracingHeader));
+    assertNotSame(fakeServerParentSpanContext, SpanContext.INVALID);
+
+    // A null Span is used as the parent in this case
+    censusTracing.getServerTracerFactory().newServerStreamTracer(
+        method.getFullMethodName(), headers);
+    verify(mockSpanFactory).startSpanWithRemoteParent(
+        isNull(SpanContext.class), eq("Recv.package1.service2.method3"),
+        any(StartSpanOptions.class));
+  }
+
+  @Test
+  public void serverBasicStatsNoHeaders() {
+    ServerStreamTracer.Factory tracerFactory = censusStats.getServerTracerFactory();
+    ServerStreamTracer tracer =
+        tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata());
+
+    Context filteredContext = tracer.filterContext(Context.ROOT);
+    assertNull(CensusStatsModule.STATS_CONTEXT_KEY.get(filteredContext));
+
+    tracer.inboundWireSize(34);
+    tracer.inboundUncompressedSize(67);
+
+    fakeClock.forwardTime(100, MILLISECONDS);
+    tracer.outboundWireSize(1028);
+    tracer.outboundUncompressedSize(1128);
+
+    fakeClock.forwardTime(16, MILLISECONDS);
+    tracer.inboundWireSize(154);
+    tracer.inboundUncompressedSize(552);
+    tracer.outboundWireSize(99);
+    tracer.outboundUncompressedSize(865);
+
+    fakeClock.forwardTime(24, MILLISECONDS);
+
+    tracer.streamClosed(Status.CANCELLED);
+
+    StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord();
+    assertNotNull(record);
+    assertNoClientContent(record);
+    TagValue methodTag = record.tags.get(RpcConstants.RPC_SERVER_METHOD);
+    assertEquals(method.getFullMethodName(), methodTag.toString());
+    TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS);
+    assertEquals(Status.Code.CANCELLED.toString(), statusTag.toString());
+    assertEquals(1, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_ERROR_COUNT));
+    assertEquals(1028 + 99, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_RESPONSE_BYTES));
+    assertEquals(1128 + 865,
+        record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES));
+    assertEquals(34 + 154, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_REQUEST_BYTES));
+    assertEquals(67 + 552,
+        record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES));
+    assertEquals(100 + 16 + 24,
+        record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_SERVER_LATENCY));
+  }
+
+  @Test
+  public void serverBasicTracingNoHeaders() {
+    ServerStreamTracer.Factory tracerFactory = censusTracing.getServerTracerFactory();
+    ServerStreamTracer tracer =
+        tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata());
+    verifyZeroInteractions(mockTracingPropagationHandler);
+    verify(mockSpanFactory).startSpanWithRemoteParent(
+        isNull(SpanContext.class), eq("Recv.package1.service2.method3"),
+        any(StartSpanOptions.class));
+
+    Context filteredContext = tracer.filterContext(Context.ROOT);
+    assertSame(spyServerSpan, ContextUtils.CONTEXT_SPAN_KEY.get(filteredContext));
+
+    verify(spyServerSpan, never()).end(any(EndSpanOptions.class));
+    tracer.streamClosed(Status.CANCELLED);
+
+    verify(spyServerSpan).end(
+        EndSpanOptions.builder()
+            .setStatus(com.google.instrumentation.trace.Status.CANCELLED).build());
+    verify(spyServerSpan, never()).end();
+  }
+
+  @Test
+  public void convertToTracingStatus() {
+    // Without description
+    for (Status.Code grpcCode : Status.Code.values()) {
+      Status grpcStatus = Status.fromCode(grpcCode);
+      com.google.instrumentation.trace.Status tracingStatus =
+          CensusTracingModule.convertStatus(grpcStatus);
+      assertEquals(grpcCode.toString(), tracingStatus.getCanonicalCode().toString());
+      assertNull(tracingStatus.getDescription());
+    }
+
+    // With description
+    for (Status.Code grpcCode : Status.Code.values()) {
+      Status grpcStatus = Status.fromCode(grpcCode).withDescription("This is my description");
+      com.google.instrumentation.trace.Status tracingStatus =
+          CensusTracingModule.convertStatus(grpcStatus);
+      assertEquals(grpcCode.toString(), tracingStatus.getCanonicalCode().toString());
+      assertEquals(grpcStatus.getDescription(), tracingStatus.getDescription());
+    }
+  }
+
+  private static void assertNoServerContent(StatsTestUtils.MetricsRecord record) {
+    assertNull(record.getMetric(RpcConstants.RPC_SERVER_ERROR_COUNT));
+    assertNull(record.getMetric(RpcConstants.RPC_SERVER_REQUEST_BYTES));
+    assertNull(record.getMetric(RpcConstants.RPC_SERVER_RESPONSE_BYTES));
+    assertNull(record.getMetric(RpcConstants.RPC_SERVER_SERVER_ELAPSED_TIME));
+    assertNull(record.getMetric(RpcConstants.RPC_SERVER_SERVER_LATENCY));
+    assertNull(record.getMetric(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES));
+    assertNull(record.getMetric(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES));
+  }
+
+  private static void assertNoClientContent(StatsTestUtils.MetricsRecord record) {
+    assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ERROR_COUNT));
+    assertNull(record.getMetric(RpcConstants.RPC_CLIENT_REQUEST_BYTES));
+    assertNull(record.getMetric(RpcConstants.RPC_CLIENT_RESPONSE_BYTES));
+    assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
+    assertNull(record.getMetric(RpcConstants.RPC_CLIENT_SERVER_ELAPSED_TIME));
+    assertNull(record.getMetric(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
+    assertNull(record.getMetric(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
+  }
+
+  // Promote the visibility of SpanFactory's methods to allow mocking
+  private abstract static class AccessibleSpanFactory extends SpanFactory {
+    @Override
+    public abstract Span startSpan(@Nullable Span parent, String name, StartSpanOptions options);
+
+    @Override
+    public abstract Span startSpanWithRemoteParent(
+        @Nullable SpanContext remoteParent, String name, StartSpanOptions options);
+  }
+
+  private static class FakeSpan extends Span {
+    FakeSpan(SpanContext ctx) {
+      super(ctx, null);
+    }
+
+    @Override
+    public void addAttributes(Map<String, AttributeValue> attributes) {
+    }
+
+    @Override
+    public void addAnnotation(String description, Map<String, AttributeValue> attributes) {
+    }
+
+    @Override
+    public void addAnnotation(Annotation annotation) {
+    }
+
+    @Override
+    public void addNetworkEvent(NetworkEvent networkEvent) {
+    }
+
+    @Override
+    public void addLink(Link link) {
+    }
+
+    @Override
+    public void end(EndSpanOptions options) {
+    }
+  }
+}
diff --git a/core/src/test/java/io/grpc/internal/CensusStreamTracerModuleTest.java b/core/src/test/java/io/grpc/internal/CensusStreamTracerModuleTest.java
deleted file mode 100644
index 764141f..0000000
--- a/core/src/test/java/io/grpc/internal/CensusStreamTracerModuleTest.java
+++ /dev/null
@@ -1,370 +0,0 @@
-/*
- * Copyright 2017, Google Inc. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- *    * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- *    * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *
- *    * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-package io.grpc.internal;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.google.instrumentation.stats.RpcConstants;
-import com.google.instrumentation.stats.StatsContext;
-import com.google.instrumentation.stats.TagValue;
-import io.grpc.CallOptions;
-import io.grpc.Channel;
-import io.grpc.ClientCall;
-import io.grpc.ClientInterceptor;
-import io.grpc.ClientStreamTracer;
-import io.grpc.Context;
-import io.grpc.Metadata;
-import io.grpc.MethodDescriptor.MethodType;
-import io.grpc.MethodDescriptor;
-import io.grpc.ServerStreamTracer;
-import io.grpc.Status;
-import io.grpc.internal.CensusStreamTracerModule.ClientCallTracer;
-import io.grpc.internal.testing.StatsTestUtils;
-import io.grpc.internal.testing.StatsTestUtils.FakeStatsContextFactory;
-import io.grpc.testing.TestMethodDescriptors;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Test for {@link StatsTraceContext}.
- */
-@RunWith(JUnit4.class)
-public class CensusStreamTracerModuleTest {
-  private static final CallOptions.Key<String> CUSTOM_OPTION =
-      CallOptions.Key.of("option1", "default");
-  private static final CallOptions CALL_OPTIONS =
-      CallOptions.DEFAULT.withOption(CUSTOM_OPTION, "customvalue");
-
-  private final FakeClock fakeClock = new FakeClock();
-  private final FakeStatsContextFactory statsCtxFactory = new FakeStatsContextFactory();
-  private final CensusStreamTracerModule census =
-      new CensusStreamTracerModule(statsCtxFactory, fakeClock.getStopwatchSupplier());
-
-  @Mock
-  private Channel mockChannel;
-  @Mock
-  private ClientCall<Void, Void> mockClientCall;
-  @Mock
-  private ClientCall.Listener<Void> mockClientCallListener;
-  @Captor
-  private ArgumentCaptor<CallOptions> callOptionsCaptor;
-  @Captor
-  private ArgumentCaptor<ClientCall.Listener<Void>> clientCallListenerCaptor;
-
-  @Before
-  @SuppressWarnings("unchecked")
-  public void setUp() {
-    MockitoAnnotations.initMocks(this);
-    when(mockChannel.newCall(any(MethodDescriptor.class), any(CallOptions.class)))
-        .thenReturn(mockClientCall);
-  }
-
-  @After
-  public void wrapUp() {
-    assertNull(statsCtxFactory.pollRecord());
-  }
-
-  @Test
-  public void clientInterceptorNoCustomTag() {
-    testClientInterceptor(false);
-  }
-
-  @Test
-  public void clientInterceptorCustomTag() {
-    testClientInterceptor(true);
-  }
-
-  private void testClientInterceptor(boolean customTag) {
-    String methodName = MethodDescriptor.generateFullMethodName("Service1", "method1");
-    ClientInterceptor interceptor = census.getClientInterceptor();
-    MethodDescriptor<Void, Void> method = MethodDescriptor.<Void, Void>newBuilder()
-        .setType(MethodType.UNARY)
-        .setFullMethodName(methodName)
-        .setRequestMarshaller(TestMethodDescriptors.voidMarshaller())
-        .setResponseMarshaller(TestMethodDescriptors.voidMarshaller())
-        .build();
-
-    ClientCall<Void, Void> call;
-    if (customTag) {
-      Context ctx =
-          Context.ROOT.withValue(
-              CensusStreamTracerModule.STATS_CONTEXT_KEY,
-              statsCtxFactory.getDefault().with(
-                  StatsTestUtils.EXTRA_TAG, TagValue.create("extra value")));
-      Context origCtx = ctx.attach();
-      try {
-        call = interceptor.interceptCall(method, CALL_OPTIONS, mockChannel);
-      } finally {
-        ctx.detach(origCtx);
-      }
-    } else {
-      assertNull(CensusStreamTracerModule.STATS_CONTEXT_KEY.get());
-      call = interceptor.interceptCall(method, CALL_OPTIONS, mockChannel);
-    }
-
-    // The interceptor adds tracer factory to CallOptions
-    verify(mockChannel).newCall(same(method), callOptionsCaptor.capture());
-    CallOptions capturedCallOptions = callOptionsCaptor.getValue();
-    assertEquals("customvalue", capturedCallOptions.getOption(CUSTOM_OPTION));
-    assertEquals(1, capturedCallOptions.getStreamTracerFactories().size());
-    assertTrue(capturedCallOptions.getStreamTracerFactories().get(0) instanceof ClientCallTracer);
-
-    // Start the call
-    Metadata headers = new Metadata();
-    call.start(mockClientCallListener, headers);
-
-    verify(mockClientCall).start(clientCallListenerCaptor.capture(), same(headers));
-    assertNull(statsCtxFactory.pollRecord());
-
-    // Then listener receives onClose()
-    Status status = Status.CANCELLED.withDescription("I am just doing it");
-    Metadata trailers = new Metadata();
-    clientCallListenerCaptor.getValue().onClose(status, trailers);
-
-    // The intercepting listener will call callEnded() on ClientTracerFactory, which records to
-    // Census.
-    verify(mockClientCallListener).onClose(same(status), same(trailers));
-    StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord();
-    assertNotNull(record);
-    TagValue methodTag = record.tags.get(RpcConstants.RPC_CLIENT_METHOD);
-    assertEquals(methodName, methodTag.toString());
-    TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS);
-    assertEquals(Status.Code.CANCELLED.toString(), statusTag.toString());
-    if (customTag) {
-      TagValue extraTag = record.tags.get(StatsTestUtils.EXTRA_TAG);
-      assertEquals("extra value", extraTag.toString());
-    } else {
-      assertNull(record.tags.get(StatsTestUtils.EXTRA_TAG));
-    }
-  }
-
-  @Test
-  public void clientBasicStats() {
-    String methodName = MethodDescriptor.generateFullMethodName("Service1", "method1");
-    ClientCallTracer callTracer =
-        census.newClientCallTracer(statsCtxFactory.getDefault(), methodName);
-    Metadata headers = new Metadata();
-    ClientStreamTracer tracer = callTracer.newClientStreamTracer(headers);
-
-    fakeClock.forwardTime(30, MILLISECONDS);
-    tracer.outboundHeaders();
-
-    fakeClock.forwardTime(100, MILLISECONDS);
-    tracer.outboundWireSize(1028);
-    tracer.outboundUncompressedSize(1128);
-
-    fakeClock.forwardTime(16, MILLISECONDS);
-    tracer.inboundWireSize(33);
-    tracer.inboundUncompressedSize(67);
-    tracer.outboundWireSize(99);
-    tracer.outboundUncompressedSize(865);
-
-    fakeClock.forwardTime(24, MILLISECONDS);
-    tracer.inboundWireSize(154);
-    tracer.inboundUncompressedSize(552);
-    tracer.streamClosed(Status.OK);
-    callTracer.callEnded(Status.OK);
-
-    StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord();
-    assertNotNull(record);
-    assertNoServerContent(record);
-    TagValue methodTag = record.tags.get(RpcConstants.RPC_CLIENT_METHOD);
-    assertEquals(methodName, methodTag.toString());
-    TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS);
-    assertEquals(Status.Code.OK.toString(), statusTag.toString());
-    assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ERROR_COUNT));
-    assertEquals(1028 + 99, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_BYTES));
-    assertEquals(1128 + 865,
-        record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
-    assertEquals(33 + 154, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_BYTES));
-    assertEquals(67 + 552,
-        record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
-    assertEquals(30 + 100 + 16 + 24,
-        record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
-  }
-
-  @Test
-  public void clientStreamNeverCreated() {
-    String methodName = MethodDescriptor.generateFullMethodName("Service1", "method2");
-    ClientCallTracer callTracer =
-        census.newClientCallTracer(statsCtxFactory.getDefault(), methodName);
-
-    fakeClock.forwardTime(3000, MILLISECONDS);
-    callTracer.callEnded(Status.DEADLINE_EXCEEDED.withDescription("3 seconds"));
-
-    StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord();
-    assertNotNull(record);
-    assertNoServerContent(record);
-    TagValue methodTag = record.tags.get(RpcConstants.RPC_CLIENT_METHOD);
-    assertEquals(methodName, methodTag.toString());
-    TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS);
-    assertEquals(Status.Code.DEADLINE_EXCEEDED.toString(), statusTag.toString());
-    assertEquals(1, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ERROR_COUNT));
-    assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_BYTES));
-    assertEquals(0,
-        record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
-    assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_BYTES));
-    assertEquals(0,
-        record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
-    assertEquals(3000, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
-    assertNull(record.getMetric(RpcConstants.RPC_CLIENT_SERVER_ELAPSED_TIME));
-  }
-
-  @Test
-  public void tagPropagation() {
-    String methodName = MethodDescriptor.generateFullMethodName("Service1", "method3");
-
-    // EXTRA_TAG is propagated by the FakeStatsContextFactory. Note that not all tags are
-    // propagated.  The StatsContextFactory decides which tags are to propagated.  gRPC facilitates
-    // the propagation by putting them in the headers.
-    StatsContext clientCtx = statsCtxFactory.getDefault().with(
-        StatsTestUtils.EXTRA_TAG, TagValue.create("extra-tag-value-897"));
-    ClientCallTracer callTracer = census.newClientCallTracer(clientCtx, methodName);
-    Metadata headers = new Metadata();
-    // This propagates clientCtx to headers
-    callTracer.newClientStreamTracer(headers);
-
-    ServerStreamTracer serverTracer =
-        census.getServerTracerFactory().newServerStreamTracer(methodName, headers);
-    // Server tracer deserializes clientCtx from the headers, so that it records stats with the
-    // propagated tags.
-    Context serverContext = serverTracer.filterContext(Context.ROOT);
-    // It also put clientCtx in the Context seen by the call handler
-    assertEquals(clientCtx, CensusStreamTracerModule.STATS_CONTEXT_KEY.get(serverContext));
-
-
-    // Verifies that the server tracer records the status with the propagated tag
-    serverTracer.streamClosed(Status.OK);
-
-    StatsTestUtils.MetricsRecord serverRecord = statsCtxFactory.pollRecord();
-    assertNotNull(serverRecord);
-    assertNoClientContent(serverRecord);
-    TagValue serverMethodTag = serverRecord.tags.get(RpcConstants.RPC_SERVER_METHOD);
-    assertEquals(methodName, serverMethodTag.toString());
-    TagValue serverStatusTag = serverRecord.tags.get(RpcConstants.RPC_STATUS);
-    assertEquals(Status.Code.OK.toString(), serverStatusTag.toString());
-    assertNull(serverRecord.getMetric(RpcConstants.RPC_SERVER_ERROR_COUNT));
-    TagValue serverPropagatedTag = serverRecord.tags.get(StatsTestUtils.EXTRA_TAG);
-    assertEquals("extra-tag-value-897", serverPropagatedTag.toString());
-
-    // Verifies that the client tracer factory uses clientCtx, which includes the custom tags, to
-    // record stats.
-    callTracer.callEnded(Status.OK);
-
-    StatsTestUtils.MetricsRecord clientRecord = statsCtxFactory.pollRecord();
-    assertNotNull(clientRecord);
-    assertNoServerContent(clientRecord);
-    TagValue clientMethodTag = clientRecord.tags.get(RpcConstants.RPC_CLIENT_METHOD);
-    assertEquals(methodName, clientMethodTag.toString());
-    TagValue clientStatusTag = clientRecord.tags.get(RpcConstants.RPC_STATUS);
-    assertEquals(Status.Code.OK.toString(), clientStatusTag.toString());
-    assertNull(clientRecord.getMetric(RpcConstants.RPC_CLIENT_ERROR_COUNT));
-    TagValue clientPropagatedTag = clientRecord.tags.get(StatsTestUtils.EXTRA_TAG);
-    assertEquals("extra-tag-value-897", clientPropagatedTag.toString());
-  }
-
-  @Test
-  public void serverBasicStats() {
-    String methodName = MethodDescriptor.generateFullMethodName("Service1", "method4");
-
-    ServerStreamTracer.Factory tracerFactory = census.getServerTracerFactory();
-    ServerStreamTracer tracer = tracerFactory.newServerStreamTracer(methodName, new Metadata());
-
-    tracer.inboundWireSize(34);
-    tracer.inboundUncompressedSize(67);
-
-    fakeClock.forwardTime(100, MILLISECONDS);
-    tracer.outboundWireSize(1028);
-    tracer.outboundUncompressedSize(1128);
-
-    fakeClock.forwardTime(16, MILLISECONDS);
-    tracer.inboundWireSize(154);
-    tracer.inboundUncompressedSize(552);
-    tracer.outboundWireSize(99);
-    tracer.outboundUncompressedSize(865);
-
-    fakeClock.forwardTime(24, MILLISECONDS);
-    tracer.streamClosed(Status.CANCELLED);
-
-    StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord();
-    assertNotNull(record);
-    assertNoClientContent(record);
-    TagValue methodTag = record.tags.get(RpcConstants.RPC_SERVER_METHOD);
-    assertEquals(methodName, methodTag.toString());
-    TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS);
-    assertEquals(Status.Code.CANCELLED.toString(), statusTag.toString());
-    assertEquals(1, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_ERROR_COUNT));
-    assertEquals(1028 + 99, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_RESPONSE_BYTES));
-    assertEquals(1128 + 865,
-        record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES));
-    assertEquals(34 + 154, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_REQUEST_BYTES));
-    assertEquals(67 + 552,
-        record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES));
-    assertEquals(100 + 16 + 24,
-        record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_SERVER_LATENCY));
-  }
-
-  private static void assertNoServerContent(StatsTestUtils.MetricsRecord record) {
-    assertNull(record.getMetric(RpcConstants.RPC_SERVER_ERROR_COUNT));
-    assertNull(record.getMetric(RpcConstants.RPC_SERVER_REQUEST_BYTES));
-    assertNull(record.getMetric(RpcConstants.RPC_SERVER_RESPONSE_BYTES));
-    assertNull(record.getMetric(RpcConstants.RPC_SERVER_SERVER_ELAPSED_TIME));
-    assertNull(record.getMetric(RpcConstants.RPC_SERVER_SERVER_LATENCY));
-    assertNull(record.getMetric(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES));
-    assertNull(record.getMetric(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES));
-  }
-
-  private static void assertNoClientContent(StatsTestUtils.MetricsRecord record) {
-    assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ERROR_COUNT));
-    assertNull(record.getMetric(RpcConstants.RPC_CLIENT_REQUEST_BYTES));
-    assertNull(record.getMetric(RpcConstants.RPC_CLIENT_RESPONSE_BYTES));
-    assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
-    assertNull(record.getMetric(RpcConstants.RPC_CLIENT_SERVER_ELAPSED_TIME));
-    assertNull(record.getMetric(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
-    assertNull(record.getMetric(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
-  }
-}