Expose compression on ClientCall and Server Call
diff --git a/core/src/main/java/io/grpc/ClientCall.java b/core/src/main/java/io/grpc/ClientCall.java
index fadd54e..f6111a7 100644
--- a/core/src/main/java/io/grpc/ClientCall.java
+++ b/core/src/main/java/io/grpc/ClientCall.java
@@ -183,4 +183,13 @@
   public boolean isReady() {
     return true;
   }
+
+  /**
+   * Enables per-message compression, if an encoding type has been negotiated.  If no message
+   * encoding has been negotiated, this is a no-op.
+   */
+  @ExperimentalApi
+  public void setMessageCompression(boolean enabled) {
+    // noop
+  }
 }
diff --git a/core/src/main/java/io/grpc/DecompressorRegistry.java b/core/src/main/java/io/grpc/DecompressorRegistry.java
index 4ed146e..a11fbf1 100644
--- a/core/src/main/java/io/grpc/DecompressorRegistry.java
+++ b/core/src/main/java/io/grpc/DecompressorRegistry.java
@@ -54,7 +54,7 @@
 public final class DecompressorRegistry {
 
   private static final DecompressorRegistry DEFAULT_INSTANCE = new DecompressorRegistry(
-      new DecompressorInfo(new Codec.Gzip(), false),
+      new DecompressorInfo(new Codec.Gzip(), true),
       new DecompressorInfo(Codec.Identity.NONE, false));
 
   public static DecompressorRegistry getDefaultInstance() {
diff --git a/core/src/main/java/io/grpc/ForwardingClientCall.java b/core/src/main/java/io/grpc/ForwardingClientCall.java
index 8d85a72..5d6f1fe 100644
--- a/core/src/main/java/io/grpc/ForwardingClientCall.java
+++ b/core/src/main/java/io/grpc/ForwardingClientCall.java
@@ -66,6 +66,11 @@
   }
 
   @Override
+  public void setMessageCompression(boolean enabled) {
+    delegate().setMessageCompression(enabled);
+  }
+
+  @Override
   public boolean isReady() {
     return delegate().isReady();
   }
diff --git a/core/src/main/java/io/grpc/ManagedChannelBuilder.java b/core/src/main/java/io/grpc/ManagedChannelBuilder.java
index 5d8ca36..3d74c9e 100644
--- a/core/src/main/java/io/grpc/ManagedChannelBuilder.java
+++ b/core/src/main/java/io/grpc/ManagedChannelBuilder.java
@@ -164,6 +164,22 @@
   public abstract T loadBalancerFactory(LoadBalancer.Factory loadBalancerFactory);
 
   /**
+   * Set the decompression registry for use in the channel.  This is an advanced API call and
+   * shouldn't be used unless you are using custom message encoding.   The default supported
+   * decompressors are in {@code DecompressorRegistry.getDefaultInstance}.
+   */
+  @ExperimentalApi
+  public abstract T decompressorRegistry(DecompressorRegistry registry);
+
+  /**
+   * Set the compression registry for use in the channel.  This is an advanced API call and
+   * shouldn't be used unless you are using custom message encoding.   The default supported
+   * compressors are in {@code CompressorRegistry.getDefaultInstance}.
+   */
+  @ExperimentalApi
+  public abstract T compressorRegistry(CompressorRegistry registry);
+
+  /**
    * Builds a channel using the given parameters.
    */
   public abstract ManagedChannel build();
diff --git a/core/src/main/java/io/grpc/ServerBuilder.java b/core/src/main/java/io/grpc/ServerBuilder.java
index 047dfa0..6af7f53 100644
--- a/core/src/main/java/io/grpc/ServerBuilder.java
+++ b/core/src/main/java/io/grpc/ServerBuilder.java
@@ -88,6 +88,22 @@
   public abstract T useTransportSecurity(File certChain, File privateKey);
 
   /**
+   * Set the decompression registry for use in the channel.  This is an advanced API call and
+   * shouldn't be used unless you are using custom message encoding.   The default supported
+   * decompressors are in {@code DecompressorRegistry.getDefaultInstance}.
+   */
+  @ExperimentalApi
+  public abstract T decompressorRegistry(DecompressorRegistry registry);
+
+  /**
+   * Set the compression registry for use in the channel.  This is an advanced API call and
+   * shouldn't be used unless you are using custom message encoding.   The default supported
+   * compressors are in {@code CompressorRegistry.getDefaultInstance}.
+   */
+  @ExperimentalApi
+  public abstract T compressorRegistry(CompressorRegistry registry);
+
+  /**
    * Builds a server using the given parameters.
    *
    * <p>The returned service will not been started or be bound a port. You will need to start it
diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
index ea79c4e..a610d6c 100644
--- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
@@ -31,11 +31,16 @@
 
 package io.grpc.internal;
 
+import static com.google.common.base.MoreObjects.firstNonNull;
+
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.MoreExecutors;
 
 import io.grpc.Attributes;
 import io.grpc.ClientInterceptor;
+import io.grpc.CompressorRegistry;
+import io.grpc.DecompressorRegistry;
+import io.grpc.ExperimentalApi;
 import io.grpc.LoadBalancer;
 import io.grpc.ManagedChannelBuilder;
 import io.grpc.NameResolver;
@@ -82,6 +87,12 @@
   @Nullable
   private LoadBalancer.Factory loadBalancerFactory;
 
+  @Nullable
+  private DecompressorRegistry decompressorRegistry;
+
+  @Nullable
+  private CompressorRegistry compressorRegistry;
+
   protected AbstractManagedChannelImplBuilder(String target) {
     this.target = Preconditions.checkNotNull(target);
     this.directServerAddress = null;
@@ -133,6 +144,20 @@
     return thisT();
   }
 
+  @Override
+  @ExperimentalApi
+  public final T decompressorRegistry(DecompressorRegistry registry) {
+    this.decompressorRegistry = registry;
+    return thisT();
+  }
+
+  @Override
+  @ExperimentalApi
+  public final T compressorRegistry(CompressorRegistry registry) {
+    this.compressorRegistry = registry;
+    return thisT();
+  }
+
   private T thisT() {
     @SuppressWarnings("unchecked")
     T thisT = (T) this;
@@ -168,12 +193,13 @@
         target,
         // TODO(carl-mastrangelo): Allow clients to pass this in
         new ExponentialBackoffPolicy.Provider(),
-        nameResolverFactory == null ? NameResolverRegistry.getDefaultRegistry()
-            : nameResolverFactory,
+        firstNonNull(nameResolverFactory, NameResolverRegistry.getDefaultRegistry()),
         getNameResolverParams(),
-        loadBalancerFactory == null ? SimpleLoadBalancerFactory.getInstance()
-            : loadBalancerFactory,
-        transportFactory, executor, userAgent, interceptors);
+        firstNonNull(loadBalancerFactory, SimpleLoadBalancerFactory.getInstance()),
+        transportFactory,
+        firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()),
+        firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()),
+        executor, userAgent, interceptors);
   }
 
   /**
@@ -186,7 +212,7 @@
   /**
    * Subclasses can override this method to provide additional parameters to {@link
    * NameResolver.Factory#newNameResolver}. The default implementation returns {@link
-   * Attributes.EMPTY}.
+   * Attributes#EMPTY}.
    */
   protected Attributes getNameResolverParams() {
     return Attributes.EMPTY;
diff --git a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
index 6657734..95df85e 100644
--- a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
@@ -34,7 +34,9 @@
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.MoreExecutors;
 
+import io.grpc.CompressorRegistry;
 import io.grpc.Context;
+import io.grpc.DecompressorRegistry;
 import io.grpc.HandlerRegistry;
 import io.grpc.Internal;
 import io.grpc.MutableHandlerRegistry;
@@ -58,6 +60,12 @@
   @Nullable
   private Executor executor;
 
+  @Nullable
+  private DecompressorRegistry decompressorRegistry;
+
+  @Nullable
+  private CompressorRegistry compressorRegistry;
+
   /**
    * Constructs using a given handler registry.
    */
@@ -99,6 +107,26 @@
   }
 
   @Override
+  public final T decompressorRegistry(DecompressorRegistry registry) {
+    decompressorRegistry = registry;
+    return thisT();
+  }
+
+  protected final DecompressorRegistry decompressorRegistry() {
+    return decompressorRegistry;
+  }
+
+  @Override
+  public final T compressorRegistry(CompressorRegistry registry) {
+    compressorRegistry = registry;
+    return thisT();
+  }
+
+  protected final CompressorRegistry compressorRegistry() {
+    return compressorRegistry;
+  }
+
+  @Override
   public ServerImpl build() {
     io.grpc.internal.Server transportServer = buildTransportServer();
     return new ServerImpl(executor, registry, transportServer, Context.ROOT);
diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java
index 505ed54..bdd169d 100644
--- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java
+++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java
@@ -31,6 +31,7 @@
 
 package io.grpc.internal;
 
+import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.Iterables.addAll;
 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITER;
@@ -338,6 +339,12 @@
   }
 
   @Override
+  public void setMessageCompression(boolean enabled) {
+    checkState(stream != null, "Not started");
+    stream.setMessageCompression(enabled);
+  }
+
+  @Override
   public boolean isReady() {
     return stream.isReady();
   }
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
index 24212f3..b473e56 100644
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
+++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
@@ -108,9 +108,8 @@
    */
   private final Set<String> knownAcceptEncodingRegistry =
       Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
-  private final DecompressorRegistry decompressorRegistry =
-      DecompressorRegistry.getDefaultInstance();
-  private final CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance();
+  private final DecompressorRegistry decompressorRegistry;
+  private final CompressorRegistry compressorRegistry;
 
   /**
    * Executor that runs deadline timers for requests.
@@ -154,9 +153,10 @@
 
   ManagedChannelImpl(String target, BackoffPolicy.Provider backoffPolicyProvider,
       NameResolver.Factory nameResolverFactory, Attributes nameResolverParams,
-      LoadBalancer.Factory loadBalancerFactory,
-      ClientTransportFactory transportFactory, @Nullable Executor executor,
-      @Nullable String userAgent, List<ClientInterceptor> interceptors) {
+      LoadBalancer.Factory loadBalancerFactory, ClientTransportFactory transportFactory,
+      DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry,
+      @Nullable Executor executor, @Nullable String userAgent,
+      List<ClientInterceptor> interceptors) {
     if (executor == null) {
       usingSharedExecutor = true;
       this.executor = SharedResourceHolder.get(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
@@ -171,6 +171,8 @@
     this.userAgent = userAgent;
     this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors);
     scheduledExecutor = SharedResourceHolder.get(TIMER_SERVICE);
+    this.decompressorRegistry = decompressorRegistry;
+    this.compressorRegistry = compressorRegistry;
 
     this.nameResolver.start(new NameResolver.Listener() {
       @Override
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
index 32208f2..ef3fc93 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
@@ -134,7 +134,8 @@
       NameResolver.Factory nameResolverFactory, List<ClientInterceptor> interceptors) {
     return new ManagedChannelImpl(target, new FakeBackoffPolicyProvider(),
         nameResolverFactory, NAME_RESOLVER_PARAMS, loadBalancerFactory,
-        mockTransportFactory, executor, null, interceptors);
+        mockTransportFactory, DecompressorRegistry.getDefaultInstance(),
+        CompressorRegistry.getDefaultInstance(), executor, null, interceptors);
   }
 
   @Before
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java
index 171e5ea..bc606e9 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java
@@ -48,6 +48,8 @@
 
 import io.grpc.Attributes;
 import io.grpc.ClientInterceptor;
+import io.grpc.CompressorRegistry;
+import io.grpc.DecompressorRegistry;
 import io.grpc.EquivalentAddressGroup;
 import io.grpc.LoadBalancer;
 import io.grpc.NameResolver;
@@ -119,7 +121,9 @@
 
     channel = new ManagedChannelImpl("fake://target", mockBackoffPolicyProvider,
         nameResolverFactory, Attributes.EMPTY, mockLoadBalancerFactory,
-        mockTransportFactory, executor, null, Collections.<ClientInterceptor>emptyList());
+        mockTransportFactory, DecompressorRegistry.getDefaultInstance(),
+        CompressorRegistry.getDefaultInstance(), executor, null,
+        Collections.<ClientInterceptor>emptyList());
 
     ArgumentCaptor<TransportManager> tmCaptor = ArgumentCaptor.forClass(TransportManager.class);
     verify(mockLoadBalancerFactory).newLoadBalancer(anyString(), tmCaptor.capture());
diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java
index f2e9eb7..f422dc2 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServer.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServer.java
@@ -35,6 +35,8 @@
 import static io.netty.channel.ChannelOption.SO_BACKLOG;
 import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
 
+import io.grpc.CompressorRegistry;
+import io.grpc.DecompressorRegistry;
 import io.grpc.internal.Server;
 import io.grpc.internal.ServerListener;
 import io.grpc.internal.SharedResourceHolder;
@@ -72,6 +74,8 @@
   private EventLoopGroup workerGroup;
   private ServerListener listener;
   private Channel channel;
+  private final DecompressorRegistry decompressorRegistry;
+  private final CompressorRegistry compressorRegistry;
   private final int flowControlWindow;
   private final int maxMessageSize;
   private final int maxHeaderListSize;
@@ -79,7 +83,8 @@
 
   NettyServer(SocketAddress address, Class<? extends ServerChannel> channelType,
               @Nullable EventLoopGroup bossGroup, @Nullable EventLoopGroup workerGroup,
-              ProtocolNegotiator protocolNegotiator, int maxStreamsPerConnection,
+              ProtocolNegotiator protocolNegotiator, DecompressorRegistry decompressorRegistry,
+              CompressorRegistry compressorRegistry, int maxStreamsPerConnection,
               int flowControlWindow, int maxMessageSize, int maxHeaderListSize) {
     this.address = address;
     this.channelType = checkNotNull(channelType, "channelType");
@@ -92,6 +97,8 @@
     this.flowControlWindow = flowControlWindow;
     this.maxMessageSize = maxMessageSize;
     this.maxHeaderListSize = maxHeaderListSize;
+    this.decompressorRegistry = checkNotNull(decompressorRegistry, "decompressorRegistry");
+    this.compressorRegistry = checkNotNull(compressorRegistry, "compressorRegistry");
   }
 
   @Override
@@ -113,13 +120,15 @@
       public void initChannel(Channel ch) throws Exception {
         eventLoopReferenceCounter.retain();
         ch.closeFuture().addListener(new ChannelFutureListener() {
+          @Override
           public void operationComplete(ChannelFuture future) {
             eventLoopReferenceCounter.release();
           }
         });
         NettyServerTransport transport
-            = new NettyServerTransport(ch, protocolNegotiator, maxStreamsPerConnection,
-                flowControlWindow, maxMessageSize, maxHeaderListSize);
+            = new NettyServerTransport(ch, protocolNegotiator, decompressorRegistry,
+                compressorRegistry, maxStreamsPerConnection, flowControlWindow, maxMessageSize,
+                maxHeaderListSize);
         transport.start(listener.transportCreated(transport));
       }
     });
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
index d4aca99..449f115 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
@@ -31,11 +31,14 @@
 
 package io.grpc.netty;
 
+import static com.google.common.base.MoreObjects.firstNonNull;
 import static com.google.common.base.Preconditions.checkArgument;
 import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
 
 import com.google.common.base.Preconditions;
 
+import io.grpc.CompressorRegistry;
+import io.grpc.DecompressorRegistry;
 import io.grpc.ExperimentalApi;
 import io.grpc.HandlerRegistry;
 import io.grpc.Internal;
@@ -242,7 +245,10 @@
               ProtocolNegotiators.serverPlaintext();
     }
     return new NettyServer(address, channelType, bossEventLoopGroup,
-            workerEventLoopGroup, negotiator, maxConcurrentCallsPerConnection, flowControlWindow,
+            workerEventLoopGroup, negotiator,
+            firstNonNull(decompressorRegistry(), DecompressorRegistry.getDefaultInstance()),
+            firstNonNull(compressorRegistry(), CompressorRegistry.getDefaultInstance()),
+            maxConcurrentCallsPerConnection, flowControlWindow,
             maxMessageSize, maxHeaderListSize);
   }
 
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
index 06b5a3f..01d3eb9 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
@@ -32,6 +32,7 @@
 package io.grpc.netty;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 import static io.grpc.netty.Utils.CONTENT_TYPE_GRPC;
 import static io.grpc.netty.Utils.CONTENT_TYPE_HEADER;
 import static io.grpc.netty.Utils.HTTP_METHOD;
@@ -42,6 +43,8 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
+import io.grpc.CompressorRegistry;
+import io.grpc.DecompressorRegistry;
 import io.grpc.Metadata;
 import io.grpc.Status;
 import io.grpc.internal.GrpcUtil;
@@ -94,6 +97,8 @@
 
   private static final Status GOAWAY_STATUS = Status.UNAVAILABLE;
 
+  private final DecompressorRegistry decompressorRegistry;
+  private final CompressorRegistry compressorRegistry;
   private final Http2Connection.PropertyKey streamKey;
   private final ServerTransportListener transportListener;
   private final int maxMessageSize;
@@ -102,6 +107,8 @@
   private WriteQueue serverWriteQueue;
 
   static NettyServerHandler newHandler(ServerTransportListener transportListener,
+                                       DecompressorRegistry decompressorRegistry,
+                                       CompressorRegistry compressorRegistry,
                                        int maxStreams,
                                        int flowControlWindow,
                                        int maxHeaderListSize,
@@ -114,13 +121,15 @@
         new DefaultHttp2FrameReader(headersDecoder), frameLogger);
     Http2FrameWriter frameWriter =
         new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger);
-    return newHandler(frameReader, frameWriter, transportListener, maxStreams, flowControlWindow,
-        maxMessageSize);
+    return newHandler(frameReader, frameWriter, transportListener, decompressorRegistry,
+        compressorRegistry, maxStreams, flowControlWindow, maxMessageSize);
   }
 
   @VisibleForTesting
   static NettyServerHandler newHandler(Http2FrameReader frameReader, Http2FrameWriter frameWriter,
                                        ServerTransportListener transportListener,
+                                       DecompressorRegistry decompressorRegistry,
+                                       CompressorRegistry compressorRegistry,
                                        int maxStreams,
                                        int flowControlWindow,
                                        int maxMessageSize) {
@@ -142,19 +151,24 @@
     settings.initialWindowSize(flowControlWindow);
     settings.maxConcurrentStreams(maxStreams);
 
-    return new NettyServerHandler(transportListener, decoder, encoder, settings, maxMessageSize);
+    return new NettyServerHandler(transportListener, decoder, encoder, settings,
+        decompressorRegistry, compressorRegistry, maxMessageSize);
   }
 
   private NettyServerHandler(ServerTransportListener transportListener,
                              Http2ConnectionDecoder decoder,
                              Http2ConnectionEncoder encoder, Http2Settings settings,
+                             DecompressorRegistry decompressorRegistry,
+                             CompressorRegistry compressorRegistry,
                              int maxMessageSize) {
     super(decoder, encoder, settings);
     checkArgument(maxMessageSize >= 0, "maxMessageSize must be >= 0");
     this.maxMessageSize = maxMessageSize;
+    this.decompressorRegistry = checkNotNull(decompressorRegistry, "decompressorRegistry");
+    this.compressorRegistry = checkNotNull(compressorRegistry, "compressorRegistry");
 
     streamKey = encoder.connection().newKey();
-    this.transportListener = Preconditions.checkNotNull(transportListener, "transportListener");
+    this.transportListener = checkNotNull(transportListener, "transportListener");
 
     // Set the frame listener on the decoder.
     decoder().frameListener(new FrameListener());
@@ -192,6 +206,11 @@
       NettyServerStream stream = new NettyServerStream(ctx.channel(), http2Stream, this,
               maxMessageSize);
 
+      // These must be called before inboundHeadersReceived, because the framers depend on knowing
+      // the compression algorithms available before negotiation.
+      stream.setDecompressionRegistry(decompressorRegistry);
+      stream.setCompressionRegistry(compressorRegistry);
+
       Metadata metadata = Utils.convertHeaders(headers);
       stream.inboundHeadersReceived(metadata);
 
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java
index e8e5e39..7275c3c 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java
@@ -31,8 +31,12 @@
 
 package io.grpc.netty;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import com.google.common.base.Preconditions;
 
+import io.grpc.CompressorRegistry;
+import io.grpc.DecompressorRegistry;
 import io.grpc.internal.ServerTransport;
 import io.grpc.internal.ServerTransportListener;
 import io.netty.channel.Channel;
@@ -51,6 +55,8 @@
 
   private final Channel channel;
   private final ProtocolNegotiator protocolNegotiator;
+  private final DecompressorRegistry decompressorRegistry;
+  private final CompressorRegistry compressorRegistry;
   private final int maxStreams;
   private ServerTransportListener listener;
   private boolean terminated;
@@ -58,14 +64,18 @@
   private final int maxMessageSize;
   private final int maxHeaderListSize;
 
-  NettyServerTransport(Channel channel, ProtocolNegotiator protocolNegotiator, int maxStreams,
-                       int flowControlWindow, int maxMessageSize, int maxHeaderListSize) {
+  NettyServerTransport(Channel channel, ProtocolNegotiator protocolNegotiator,
+                       DecompressorRegistry decompressorRegistry,
+                       CompressorRegistry compressorRegistry, int maxStreams, int flowControlWindow,
+                       int maxMessageSize, int maxHeaderListSize) {
     this.channel = Preconditions.checkNotNull(channel, "channel");
     this.protocolNegotiator = Preconditions.checkNotNull(protocolNegotiator, "protocolNegotiator");
     this.maxStreams = maxStreams;
     this.flowControlWindow = flowControlWindow;
     this.maxMessageSize = maxMessageSize;
     this.maxHeaderListSize = maxHeaderListSize;
+    this.decompressorRegistry = checkNotNull(decompressorRegistry, "decompressorRegistry");
+    this.compressorRegistry = checkNotNull(compressorRegistry, "compressorRegistry");
   }
 
   public void start(ServerTransportListener listener) {
@@ -115,7 +125,7 @@
    * Creates the Netty handler to be used in the channel pipeline.
    */
   private NettyServerHandler createHandler(ServerTransportListener transportListener) {
-    return NettyServerHandler.newHandler(transportListener, maxStreams, flowControlWindow,
-        maxHeaderListSize, maxMessageSize);
+    return NettyServerHandler.newHandler(transportListener, decompressorRegistry,
+        compressorRegistry, maxStreams, flowControlWindow, maxHeaderListSize, maxMessageSize);
   }
 }
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
index fb79187..c2e171f 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
@@ -43,6 +43,8 @@
 import com.google.common.io.ByteStreams;
 import com.google.common.util.concurrent.SettableFuture;
 
+import io.grpc.CompressorRegistry;
+import io.grpc.DecompressorRegistry;
 import io.grpc.Metadata;
 import io.grpc.MethodDescriptor;
 import io.grpc.MethodDescriptor.Marshaller;
@@ -312,7 +314,8 @@
         .ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE).build();
     ProtocolNegotiator negotiator = ProtocolNegotiators.serverTls(serverContext);
     server = new NettyServer(address, NioServerSocketChannel.class,
-            group, group, negotiator, maxStreamsPerConnection,
+            group, group, negotiator, DecompressorRegistry.getDefaultInstance(),
+            CompressorRegistry.getDefaultInstance(), maxStreamsPerConnection,
             DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, maxHeaderListSize);
     server.start(serverListener);
   }
diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java
index 52ec3fd..9e4c0bb 100644
--- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java
@@ -54,6 +54,8 @@
 
 import com.google.common.io.ByteStreams;
 
+import io.grpc.CompressorRegistry;
+import io.grpc.DecompressorRegistry;
 import io.grpc.Metadata;
 import io.grpc.Status;
 import io.grpc.Status.Code;
@@ -345,6 +347,7 @@
   @Override
   protected NettyServerHandler newHandler() {
     return NettyServerHandler.newHandler(frameReader(), frameWriter(), transportListener,
+        DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(),
         maxConcurrentStreams, flowControlWindow, DEFAULT_MAX_MESSAGE_SIZE);
   }