core: stop using static flags for Census control. (#2947)

Static mutable flags are evil.  Turned them to options on channel
builder.  Also restore the local stats recording by default, because
these flags were added with the concern of wire-compatibility, but not
local feature.
diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
index c97fffc..5f4b443 100644
--- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
@@ -120,6 +120,9 @@
 
   private int maxInboundMessageSize = GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
 
+  private boolean enableStatsTagPropagation;
+  private boolean enableTracing;
+
   // Can be overriden by subclasses.
   @Override
   public T maxInboundMessageSize(int max) {
@@ -273,6 +276,24 @@
     return GrpcUtil.checkAuthority(authority);
   }
 
+  /**
+   * Set it to true to propagate the stats tags on the wire.  This will be deleted assuming always
+   * enabled once the instrumentation-java wire format is stabilized.
+   */
+  @Deprecated
+  public void setEnableStatsTagPropagation(boolean enabled) {
+    this.enableStatsTagPropagation = enabled;
+  }
+
+  /**
+   * Set it to true to record traces and propagate tracing information on the wire.  This will be
+   * deleted assuming always enabled once the instrumentation-java wire format is stabilized.
+   */
+  @Deprecated
+  public void setEnableTracing(boolean enabled) {
+    this.enableTracing = enabled;
+  }
+
   @Override
   public ManagedChannel build() {
     ClientTransportFactory transportFactory = buildTransportFactory();
@@ -290,18 +311,19 @@
 
     List<ClientInterceptor> effectiveInterceptors =
         new ArrayList<ClientInterceptor>(this.interceptors);
-    if (GrpcUtil.enableCensusStats && recordsStats()) {
+    if (recordsStats()) {
       StatsContextFactory statsCtxFactory =
           this.statsFactory != null ? this.statsFactory : Stats.getStatsContextFactory();
       if (statsCtxFactory != null) {
         CensusStatsModule censusStats =
-            new CensusStatsModule(statsCtxFactory, GrpcUtil.STOPWATCH_SUPPLIER);
+            new CensusStatsModule(
+                statsCtxFactory, GrpcUtil.STOPWATCH_SUPPLIER, enableStatsTagPropagation);
         // First interceptor runs last (see ClientInterceptors.intercept()), so that no
         // other interceptor can override the tracer factory we set in CallOptions.
         effectiveInterceptors.add(0, censusStats.getClientInterceptor());
       }
     }
-    if (GrpcUtil.enableCensusTracing) {
+    if (enableTracing) {
       CensusTracingModule censusTracing =
           new CensusTracingModule(Tracing.getTracer(), Tracing.getBinaryPropagationHandler());
       effectiveInterceptors.add(0, censusTracing.getClientInterceptor());
diff --git a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
index fbeb147..af3dd0e 100644
--- a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
@@ -173,20 +173,17 @@
   public ServerImpl build() {
     ArrayList<ServerStreamTracer.Factory> tracerFactories =
         new ArrayList<ServerStreamTracer.Factory>();
-    if (GrpcUtil.enableCensusStats) {
-      StatsContextFactory statsFactory =
-          this.statsFactory != null ? this.statsFactory : Stats.getStatsContextFactory();
-      if (statsFactory != null) {
-        CensusStatsModule censusStats =
-            new CensusStatsModule(statsFactory, GrpcUtil.STOPWATCH_SUPPLIER);
-        tracerFactories.add(censusStats.getServerTracerFactory());
-      }
+    StatsContextFactory statsFactory =
+        this.statsFactory != null ? this.statsFactory : Stats.getStatsContextFactory();
+    if (statsFactory != null) {
+      CensusStatsModule censusStats =
+          new CensusStatsModule(
+              statsFactory, GrpcUtil.STOPWATCH_SUPPLIER, true /** only matters on client-side **/);
+      tracerFactories.add(censusStats.getServerTracerFactory());
     }
-    if (GrpcUtil.enableCensusTracing) {
-      CensusTracingModule censusTracing =
-          new CensusTracingModule(Tracing.getTracer(), Tracing.getBinaryPropagationHandler());
-      tracerFactories.add(censusTracing.getServerTracerFactory());
-    }
+    CensusTracingModule censusTracing =
+        new CensusTracingModule(Tracing.getTracer(), Tracing.getBinaryPropagationHandler());
+    tracerFactories.add(censusTracing.getServerTracerFactory());
     tracerFactories.addAll(streamTracerFactories);
 
     io.grpc.internal.InternalServer transportServer =
diff --git a/core/src/main/java/io/grpc/internal/CensusStatsModule.java b/core/src/main/java/io/grpc/internal/CensusStatsModule.java
index ac05e88..19cce81 100644
--- a/core/src/main/java/io/grpc/internal/CensusStatsModule.java
+++ b/core/src/main/java/io/grpc/internal/CensusStatsModule.java
@@ -94,11 +94,14 @@
   final Metadata.Key<StatsContext> statsHeader;
   private final StatsClientInterceptor clientInterceptor = new StatsClientInterceptor();
   private final ServerTracerFactory serverTracerFactory = new ServerTracerFactory();
+  private final boolean propagateTags;
 
   CensusStatsModule(
-      final StatsContextFactory statsCtxFactory, Supplier<Stopwatch> stopwatchSupplier) {
+      final StatsContextFactory statsCtxFactory, Supplier<Stopwatch> stopwatchSupplier,
+      boolean propagateTags) {
     this.statsCtxFactory = checkNotNull(statsCtxFactory, "statsCtxFactory");
     this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
+    this.propagateTags = propagateTags;
     this.statsHeader =
         Metadata.Key.of("grpc-tags-bin", new Metadata.BinaryMarshaller<StatsContext>() {
             @Override
@@ -197,9 +200,11 @@
       // one streams.  We will need to update this file to support them.
       checkState(streamTracer.compareAndSet(null, tracer),
           "Are you creating multiple streams per call? This class doesn't yet support this case.");
-      headers.discardAll(statsHeader);
-      if (parentCtx != statsCtxFactory.getDefault()) {
-        headers.put(statsHeader, parentCtx);
+      if (propagateTags) {
+        headers.discardAll(statsHeader);
+        if (parentCtx != statsCtxFactory.getDefault()) {
+          headers.put(statsHeader, parentCtx);
+        }
       }
       return tracer;
     }
diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java
index 4da7be6..0ffd05b 100644
--- a/core/src/main/java/io/grpc/internal/GrpcUtil.java
+++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java
@@ -203,20 +203,6 @@
   public static final long SERVER_KEEPALIVE_TIME_NANOS_DISABLED = Long.MAX_VALUE;
 
   /**
-   * Whether the channel builder and server builder will try to load and use Census stats library.
-   * Delete this and assume always on once Census stats has been fully tested and its wire-format is
-   * stabilized.
-   */
-  public static boolean enableCensusStats;
-
-  /**
-   * Whether the channel builder and server builder will try to load and use Census tracing library.
-   * Delete this and assume always on once Census stats has been fully tested and its wire-format is
-   * stabilized.
-   */
-  public static boolean enableCensusTracing;
-
-  /**
    * Maps HTTP error response status codes to transport codes, as defined in <a
    * href="https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md">
    * http-grpc-status-mapping.md</a>. Never returns a status for which {@code status.isOk()} is
diff --git a/core/src/test/java/io/grpc/internal/CensusModulesTest.java b/core/src/test/java/io/grpc/internal/CensusModulesTest.java
index 23e3a00..ebe0def 100644
--- a/core/src/test/java/io/grpc/internal/CensusModulesTest.java
+++ b/core/src/test/java/io/grpc/internal/CensusModulesTest.java
@@ -210,7 +210,7 @@
     when(mockTracingPropagationHandler.fromBinaryValue(any(byte[].class)))
         .thenReturn(fakeServerParentSpanContext);
     tracer = new Tracer(mockSpanFactory) {};
-    censusStats = new CensusStatsModule(statsCtxFactory, fakeClock.getStopwatchSupplier());
+    censusStats = new CensusStatsModule(statsCtxFactory, fakeClock.getStopwatchSupplier(), true);
     censusTracing = new CensusTracingModule(tracer, mockTracingPropagationHandler);
   }
 
@@ -445,20 +445,36 @@
 
   @Test
   public void statsHeadersPropagateTags() {
+    subtestStatsHeadersPropagateTags(true);
+  }
+
+  @Test
+  public void statsHeadersNotPropagateTags() {
+    subtestStatsHeadersPropagateTags(false);
+  }
+
+  private void subtestStatsHeadersPropagateTags(boolean propagate) {
     // EXTRA_TAG is propagated by the FakeStatsContextFactory. Note that not all tags are
     // propagated.  The StatsContextFactory decides which tags are to propagated.  gRPC facilitates
     // the propagation by putting them in the headers.
     StatsContext clientCtx = statsCtxFactory.getDefault().with(
         StatsTestUtils.EXTRA_TAG, TagValue.create("extra-tag-value-897"));
-    CensusStatsModule.ClientCallTracer callTracer =
-        censusStats.newClientCallTracer(clientCtx, method.getFullMethodName());
+    CensusStatsModule census =
+        new CensusStatsModule(statsCtxFactory, fakeClock.getStopwatchSupplier(), propagate);
     Metadata headers = new Metadata();
-    // This propagates clientCtx to headers
+    CensusStatsModule.ClientCallTracer callTracer =
+        census.newClientCallTracer(clientCtx, method.getFullMethodName());
+    // This propagates clientCtx to headers if propagates==true
     callTracer.newClientStreamTracer(headers);
-    assertTrue(headers.containsKey(censusStats.statsHeader));
+    if (propagate) {
+      assertTrue(headers.containsKey(census.statsHeader));
+    } else {
+      assertFalse(headers.containsKey(census.statsHeader));
+      return;
+    }
 
     ServerStreamTracer serverTracer =
-        censusStats.getServerTracerFactory().newServerStreamTracer(
+        census.getServerTracerFactory().newServerStreamTracer(
             method.getFullMethodName(), headers);
     // Server tracer deserializes clientCtx from the headers, so that it records stats with the
     // propagated tags.
@@ -466,7 +482,6 @@
     // It also put clientCtx in the Context seen by the call handler
     assertEquals(clientCtx, CensusStatsModule.STATS_CONTEXT_KEY.get(serverContext));
 
-
     // Verifies that the server tracer records the status with the propagated tag
     serverTracer.streamClosed(Status.OK);
 
diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java
index ca8b08f..a490712 100644
--- a/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java
+++ b/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java
@@ -32,7 +32,6 @@
 package io.grpc.testing.integration;
 
 import io.grpc.ManagedChannel;
-import io.grpc.internal.GrpcUtil;
 import io.grpc.netty.HandlerSettings;
 import io.grpc.netty.NegotiationType;
 import io.grpc.netty.NettyChannelBuilder;
@@ -47,7 +46,6 @@
 
   @BeforeClass
   public static void turnOnAutoWindow() {
-    GrpcUtil.enableCensusStats = true;
     HandlerSettings.enable(true);
     HandlerSettings.autoWindowOn(true);
     startStaticServer(
diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyLocalChannelTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyLocalChannelTest.java
index eb26f8a..05d3c8f 100644
--- a/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyLocalChannelTest.java
+++ b/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyLocalChannelTest.java
@@ -32,7 +32,6 @@
 package io.grpc.testing.integration;
 
 import io.grpc.ManagedChannel;
-import io.grpc.internal.GrpcUtil;
 import io.grpc.netty.NegotiationType;
 import io.grpc.netty.NettyChannelBuilder;
 import io.grpc.netty.NettyServerBuilder;
@@ -53,7 +52,6 @@
   /** Start server. */
   @BeforeClass
   public static void startServer() {
-    GrpcUtil.enableCensusStats = true;
     startStaticServer(
         NettyServerBuilder
             .forAddress(new LocalAddress("in-process-1"))
diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java
index c52add6..846c584 100644
--- a/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java
+++ b/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java
@@ -32,7 +32,6 @@
 package io.grpc.testing.integration;
 
 import io.grpc.ManagedChannel;
-import io.grpc.internal.GrpcUtil;
 import io.grpc.netty.GrpcSslContexts;
 import io.grpc.netty.NettyChannelBuilder;
 import io.grpc.netty.NettyServerBuilder;
@@ -56,7 +55,6 @@
   /** Starts the server with HTTPS. */
   @BeforeClass
   public static void startServer() {
-    GrpcUtil.enableCensusStats = true;
     try {
       startStaticServer(NettyServerBuilder.forPort(0)
           .flowControlWindow(65 * 1024)
diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java
index bb6f71e..28d4ed1 100644
--- a/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java
+++ b/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java
@@ -70,7 +70,6 @@
   /** Starts the server with HTTPS. */
   @BeforeClass
   public static void startServer() throws Exception {
-    GrpcUtil.enableCensusStats = true;
     try {
       SslProvider sslProvider = SslContext.defaultServerProvider();
       if (sslProvider == SslProvider.OPENSSL && !OpenSsl.isAlpnSupported()) {
diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java
index b6a2f1a..55a007e 100644
--- a/interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java
+++ b/interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java
@@ -95,7 +95,6 @@
   /** Start server. */
   @BeforeClass
   public static void startServer() {
-    GrpcUtil.enableCensusStats = true;
     compressors.register(FZIPPER);
     compressors.register(Codec.Identity.NONE);
     startStaticServer(