Expose compression on ClientCall and Server Call
diff --git a/core/src/main/java/io/grpc/ClientCall.java b/core/src/main/java/io/grpc/ClientCall.java
index fadd54e..f6111a7 100644
--- a/core/src/main/java/io/grpc/ClientCall.java
+++ b/core/src/main/java/io/grpc/ClientCall.java
@@ -183,4 +183,13 @@
   public boolean isReady() {
     return true;
   }
+
+  /**
+   * Enables per-message compression, if an encoding type has been negotiated.  If no message
+   * encoding has been negotiated, this is a no-op.
+   */
+  @ExperimentalApi
+  public void setMessageCompression(boolean enabled) {
+    // noop
+  }
 }
diff --git a/core/src/main/java/io/grpc/DecompressorRegistry.java b/core/src/main/java/io/grpc/DecompressorRegistry.java
index 4ed146e..a11fbf1 100644
--- a/core/src/main/java/io/grpc/DecompressorRegistry.java
+++ b/core/src/main/java/io/grpc/DecompressorRegistry.java
@@ -54,7 +54,7 @@
 public final class DecompressorRegistry {
 
   private static final DecompressorRegistry DEFAULT_INSTANCE = new DecompressorRegistry(
-      new DecompressorInfo(new Codec.Gzip(), false),
+      new DecompressorInfo(new Codec.Gzip(), true),
       new DecompressorInfo(Codec.Identity.NONE, false));
 
   public static DecompressorRegistry getDefaultInstance() {
diff --git a/core/src/main/java/io/grpc/ForwardingClientCall.java b/core/src/main/java/io/grpc/ForwardingClientCall.java
index 8d85a72..5d6f1fe 100644
--- a/core/src/main/java/io/grpc/ForwardingClientCall.java
+++ b/core/src/main/java/io/grpc/ForwardingClientCall.java
@@ -66,6 +66,11 @@
   }
 
   @Override
+  public void setMessageCompression(boolean enabled) {
+    delegate().setMessageCompression(enabled);
+  }
+
+  @Override
   public boolean isReady() {
     return delegate().isReady();
   }
diff --git a/core/src/main/java/io/grpc/ManagedChannelBuilder.java b/core/src/main/java/io/grpc/ManagedChannelBuilder.java
index 5d8ca36..3d74c9e 100644
--- a/core/src/main/java/io/grpc/ManagedChannelBuilder.java
+++ b/core/src/main/java/io/grpc/ManagedChannelBuilder.java
@@ -164,6 +164,22 @@
   public abstract T loadBalancerFactory(LoadBalancer.Factory loadBalancerFactory);
 
   /**
+   * Set the decompression registry for use in the channel.  This is an advanced API call and
+   * shouldn't be used unless you are using custom message encoding.   The default supported
+   * decompressors are in {@code DecompressorRegistry.getDefaultInstance}.
+   */
+  @ExperimentalApi
+  public abstract T decompressorRegistry(DecompressorRegistry registry);
+
+  /**
+   * Set the compression registry for use in the channel.  This is an advanced API call and
+   * shouldn't be used unless you are using custom message encoding.   The default supported
+   * compressors are in {@code CompressorRegistry.getDefaultInstance}.
+   */
+  @ExperimentalApi
+  public abstract T compressorRegistry(CompressorRegistry registry);
+
+  /**
    * Builds a channel using the given parameters.
    */
   public abstract ManagedChannel build();
diff --git a/core/src/main/java/io/grpc/ServerBuilder.java b/core/src/main/java/io/grpc/ServerBuilder.java
index 047dfa0..6af7f53 100644
--- a/core/src/main/java/io/grpc/ServerBuilder.java
+++ b/core/src/main/java/io/grpc/ServerBuilder.java
@@ -88,6 +88,22 @@
   public abstract T useTransportSecurity(File certChain, File privateKey);
 
   /**
+   * Set the decompression registry for use in the channel.  This is an advanced API call and
+   * shouldn't be used unless you are using custom message encoding.   The default supported
+   * decompressors are in {@code DecompressorRegistry.getDefaultInstance}.
+   */
+  @ExperimentalApi
+  public abstract T decompressorRegistry(DecompressorRegistry registry);
+
+  /**
+   * Set the compression registry for use in the channel.  This is an advanced API call and
+   * shouldn't be used unless you are using custom message encoding.   The default supported
+   * compressors are in {@code CompressorRegistry.getDefaultInstance}.
+   */
+  @ExperimentalApi
+  public abstract T compressorRegistry(CompressorRegistry registry);
+
+  /**
    * Builds a server using the given parameters.
    *
    * <p>The returned service will not been started or be bound a port. You will need to start it
diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
index ea79c4e..a610d6c 100644
--- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
@@ -31,11 +31,16 @@
 
 package io.grpc.internal;
 
+import static com.google.common.base.MoreObjects.firstNonNull;
+
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.MoreExecutors;
 
 import io.grpc.Attributes;
 import io.grpc.ClientInterceptor;
+import io.grpc.CompressorRegistry;
+import io.grpc.DecompressorRegistry;
+import io.grpc.ExperimentalApi;
 import io.grpc.LoadBalancer;
 import io.grpc.ManagedChannelBuilder;
 import io.grpc.NameResolver;
@@ -82,6 +87,12 @@
   @Nullable
   private LoadBalancer.Factory loadBalancerFactory;
 
+  @Nullable
+  private DecompressorRegistry decompressorRegistry;
+
+  @Nullable
+  private CompressorRegistry compressorRegistry;
+
   protected AbstractManagedChannelImplBuilder(String target) {
     this.target = Preconditions.checkNotNull(target);
     this.directServerAddress = null;
@@ -133,6 +144,20 @@
     return thisT();
   }
 
+  @Override
+  @ExperimentalApi
+  public final T decompressorRegistry(DecompressorRegistry registry) {
+    this.decompressorRegistry = registry;
+    return thisT();
+  }
+
+  @Override
+  @ExperimentalApi
+  public final T compressorRegistry(CompressorRegistry registry) {
+    this.compressorRegistry = registry;
+    return thisT();
+  }
+
   private T thisT() {
     @SuppressWarnings("unchecked")
     T thisT = (T) this;
@@ -168,12 +193,13 @@
         target,
         // TODO(carl-mastrangelo): Allow clients to pass this in
         new ExponentialBackoffPolicy.Provider(),
-        nameResolverFactory == null ? NameResolverRegistry.getDefaultRegistry()
-            : nameResolverFactory,
+        firstNonNull(nameResolverFactory, NameResolverRegistry.getDefaultRegistry()),
         getNameResolverParams(),
-        loadBalancerFactory == null ? SimpleLoadBalancerFactory.getInstance()
-            : loadBalancerFactory,
-        transportFactory, executor, userAgent, interceptors);
+        firstNonNull(loadBalancerFactory, SimpleLoadBalancerFactory.getInstance()),
+        transportFactory,
+        firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()),
+        firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()),
+        executor, userAgent, interceptors);
   }
 
   /**
@@ -186,7 +212,7 @@
   /**
    * Subclasses can override this method to provide additional parameters to {@link
    * NameResolver.Factory#newNameResolver}. The default implementation returns {@link
-   * Attributes.EMPTY}.
+   * Attributes#EMPTY}.
    */
   protected Attributes getNameResolverParams() {
     return Attributes.EMPTY;
diff --git a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
index 6657734..95df85e 100644
--- a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
@@ -34,7 +34,9 @@
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.MoreExecutors;
 
+import io.grpc.CompressorRegistry;
 import io.grpc.Context;
+import io.grpc.DecompressorRegistry;
 import io.grpc.HandlerRegistry;
 import io.grpc.Internal;
 import io.grpc.MutableHandlerRegistry;
@@ -58,6 +60,12 @@
   @Nullable
   private Executor executor;
 
+  @Nullable
+  private DecompressorRegistry decompressorRegistry;
+
+  @Nullable
+  private CompressorRegistry compressorRegistry;
+
   /**
    * Constructs using a given handler registry.
    */
@@ -99,6 +107,26 @@
   }
 
   @Override
+  public final T decompressorRegistry(DecompressorRegistry registry) {
+    decompressorRegistry = registry;
+    return thisT();
+  }
+
+  protected final DecompressorRegistry decompressorRegistry() {
+    return decompressorRegistry;
+  }
+
+  @Override
+  public final T compressorRegistry(CompressorRegistry registry) {
+    compressorRegistry = registry;
+    return thisT();
+  }
+
+  protected final CompressorRegistry compressorRegistry() {
+    return compressorRegistry;
+  }
+
+  @Override
   public ServerImpl build() {
     io.grpc.internal.Server transportServer = buildTransportServer();
     return new ServerImpl(executor, registry, transportServer, Context.ROOT);
diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java
index 505ed54..bdd169d 100644
--- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java
+++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java
@@ -31,6 +31,7 @@
 
 package io.grpc.internal;
 
+import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.Iterables.addAll;
 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITER;
@@ -338,6 +339,12 @@
   }
 
   @Override
+  public void setMessageCompression(boolean enabled) {
+    checkState(stream != null, "Not started");
+    stream.setMessageCompression(enabled);
+  }
+
+  @Override
   public boolean isReady() {
     return stream.isReady();
   }
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
index 24212f3..b473e56 100644
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
+++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
@@ -108,9 +108,8 @@
    */
   private final Set<String> knownAcceptEncodingRegistry =
       Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
-  private final DecompressorRegistry decompressorRegistry =
-      DecompressorRegistry.getDefaultInstance();
-  private final CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance();
+  private final DecompressorRegistry decompressorRegistry;
+  private final CompressorRegistry compressorRegistry;
 
   /**
    * Executor that runs deadline timers for requests.
@@ -154,9 +153,10 @@
 
   ManagedChannelImpl(String target, BackoffPolicy.Provider backoffPolicyProvider,
       NameResolver.Factory nameResolverFactory, Attributes nameResolverParams,
-      LoadBalancer.Factory loadBalancerFactory,
-      ClientTransportFactory transportFactory, @Nullable Executor executor,
-      @Nullable String userAgent, List<ClientInterceptor> interceptors) {
+      LoadBalancer.Factory loadBalancerFactory, ClientTransportFactory transportFactory,
+      DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry,
+      @Nullable Executor executor, @Nullable String userAgent,
+      List<ClientInterceptor> interceptors) {
     if (executor == null) {
       usingSharedExecutor = true;
       this.executor = SharedResourceHolder.get(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
@@ -171,6 +171,8 @@
     this.userAgent = userAgent;
     this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors);
     scheduledExecutor = SharedResourceHolder.get(TIMER_SERVICE);
+    this.decompressorRegistry = decompressorRegistry;
+    this.compressorRegistry = compressorRegistry;
 
     this.nameResolver.start(new NameResolver.Listener() {
       @Override
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
index 32208f2..ef3fc93 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
@@ -134,7 +134,8 @@
       NameResolver.Factory nameResolverFactory, List<ClientInterceptor> interceptors) {
     return new ManagedChannelImpl(target, new FakeBackoffPolicyProvider(),
         nameResolverFactory, NAME_RESOLVER_PARAMS, loadBalancerFactory,
-        mockTransportFactory, executor, null, interceptors);
+        mockTransportFactory, DecompressorRegistry.getDefaultInstance(),
+        CompressorRegistry.getDefaultInstance(), executor, null, interceptors);
   }
 
   @Before
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java
index 171e5ea..bc606e9 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java
@@ -48,6 +48,8 @@
 
 import io.grpc.Attributes;
 import io.grpc.ClientInterceptor;
+import io.grpc.CompressorRegistry;
+import io.grpc.DecompressorRegistry;
 import io.grpc.EquivalentAddressGroup;
 import io.grpc.LoadBalancer;
 import io.grpc.NameResolver;
@@ -119,7 +121,9 @@
 
     channel = new ManagedChannelImpl("fake://target", mockBackoffPolicyProvider,
         nameResolverFactory, Attributes.EMPTY, mockLoadBalancerFactory,
-        mockTransportFactory, executor, null, Collections.<ClientInterceptor>emptyList());
+        mockTransportFactory, DecompressorRegistry.getDefaultInstance(),
+        CompressorRegistry.getDefaultInstance(), executor, null,
+        Collections.<ClientInterceptor>emptyList());
 
     ArgumentCaptor<TransportManager> tmCaptor = ArgumentCaptor.forClass(TransportManager.class);
     verify(mockLoadBalancerFactory).newLoadBalancer(anyString(), tmCaptor.capture());