Expose compression on ClientCall and Server Call
diff --git a/core/src/main/java/io/grpc/ClientCall.java b/core/src/main/java/io/grpc/ClientCall.java
index fadd54e..f6111a7 100644
--- a/core/src/main/java/io/grpc/ClientCall.java
+++ b/core/src/main/java/io/grpc/ClientCall.java
@@ -183,4 +183,13 @@
public boolean isReady() {
return true;
}
+
+ /**
+ * Enables per-message compression, if an encoding type has been negotiated. If no message
+ * encoding has been negotiated, this is a no-op.
+ */
+ @ExperimentalApi
+ public void setMessageCompression(boolean enabled) {
+ // noop
+ }
}
diff --git a/core/src/main/java/io/grpc/DecompressorRegistry.java b/core/src/main/java/io/grpc/DecompressorRegistry.java
index 4ed146e..a11fbf1 100644
--- a/core/src/main/java/io/grpc/DecompressorRegistry.java
+++ b/core/src/main/java/io/grpc/DecompressorRegistry.java
@@ -54,7 +54,7 @@
public final class DecompressorRegistry {
private static final DecompressorRegistry DEFAULT_INSTANCE = new DecompressorRegistry(
- new DecompressorInfo(new Codec.Gzip(), false),
+ new DecompressorInfo(new Codec.Gzip(), true),
new DecompressorInfo(Codec.Identity.NONE, false));
public static DecompressorRegistry getDefaultInstance() {
diff --git a/core/src/main/java/io/grpc/ForwardingClientCall.java b/core/src/main/java/io/grpc/ForwardingClientCall.java
index 8d85a72..5d6f1fe 100644
--- a/core/src/main/java/io/grpc/ForwardingClientCall.java
+++ b/core/src/main/java/io/grpc/ForwardingClientCall.java
@@ -66,6 +66,11 @@
}
@Override
+ public void setMessageCompression(boolean enabled) {
+ delegate().setMessageCompression(enabled);
+ }
+
+ @Override
public boolean isReady() {
return delegate().isReady();
}
diff --git a/core/src/main/java/io/grpc/ManagedChannelBuilder.java b/core/src/main/java/io/grpc/ManagedChannelBuilder.java
index 5d8ca36..3d74c9e 100644
--- a/core/src/main/java/io/grpc/ManagedChannelBuilder.java
+++ b/core/src/main/java/io/grpc/ManagedChannelBuilder.java
@@ -164,6 +164,22 @@
public abstract T loadBalancerFactory(LoadBalancer.Factory loadBalancerFactory);
/**
+ * Set the decompression registry for use in the channel. This is an advanced API call and
+ * shouldn't be used unless you are using custom message encoding. The default supported
+ * decompressors are in {@code DecompressorRegistry.getDefaultInstance}.
+ */
+ @ExperimentalApi
+ public abstract T decompressorRegistry(DecompressorRegistry registry);
+
+ /**
+ * Set the compression registry for use in the channel. This is an advanced API call and
+ * shouldn't be used unless you are using custom message encoding. The default supported
+ * compressors are in {@code CompressorRegistry.getDefaultInstance}.
+ */
+ @ExperimentalApi
+ public abstract T compressorRegistry(CompressorRegistry registry);
+
+ /**
* Builds a channel using the given parameters.
*/
public abstract ManagedChannel build();
diff --git a/core/src/main/java/io/grpc/ServerBuilder.java b/core/src/main/java/io/grpc/ServerBuilder.java
index 047dfa0..6af7f53 100644
--- a/core/src/main/java/io/grpc/ServerBuilder.java
+++ b/core/src/main/java/io/grpc/ServerBuilder.java
@@ -88,6 +88,22 @@
public abstract T useTransportSecurity(File certChain, File privateKey);
/**
+ * Set the decompression registry for use in the channel. This is an advanced API call and
+ * shouldn't be used unless you are using custom message encoding. The default supported
+ * decompressors are in {@code DecompressorRegistry.getDefaultInstance}.
+ */
+ @ExperimentalApi
+ public abstract T decompressorRegistry(DecompressorRegistry registry);
+
+ /**
+ * Set the compression registry for use in the channel. This is an advanced API call and
+ * shouldn't be used unless you are using custom message encoding. The default supported
+ * compressors are in {@code CompressorRegistry.getDefaultInstance}.
+ */
+ @ExperimentalApi
+ public abstract T compressorRegistry(CompressorRegistry registry);
+
+ /**
* Builds a server using the given parameters.
*
* <p>The returned service will not been started or be bound a port. You will need to start it
diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
index ea79c4e..a610d6c 100644
--- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
@@ -31,11 +31,16 @@
package io.grpc.internal;
+import static com.google.common.base.MoreObjects.firstNonNull;
+
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Attributes;
import io.grpc.ClientInterceptor;
+import io.grpc.CompressorRegistry;
+import io.grpc.DecompressorRegistry;
+import io.grpc.ExperimentalApi;
import io.grpc.LoadBalancer;
import io.grpc.ManagedChannelBuilder;
import io.grpc.NameResolver;
@@ -82,6 +87,12 @@
@Nullable
private LoadBalancer.Factory loadBalancerFactory;
+ @Nullable
+ private DecompressorRegistry decompressorRegistry;
+
+ @Nullable
+ private CompressorRegistry compressorRegistry;
+
protected AbstractManagedChannelImplBuilder(String target) {
this.target = Preconditions.checkNotNull(target);
this.directServerAddress = null;
@@ -133,6 +144,20 @@
return thisT();
}
+ @Override
+ @ExperimentalApi
+ public final T decompressorRegistry(DecompressorRegistry registry) {
+ this.decompressorRegistry = registry;
+ return thisT();
+ }
+
+ @Override
+ @ExperimentalApi
+ public final T compressorRegistry(CompressorRegistry registry) {
+ this.compressorRegistry = registry;
+ return thisT();
+ }
+
private T thisT() {
@SuppressWarnings("unchecked")
T thisT = (T) this;
@@ -168,12 +193,13 @@
target,
// TODO(carl-mastrangelo): Allow clients to pass this in
new ExponentialBackoffPolicy.Provider(),
- nameResolverFactory == null ? NameResolverRegistry.getDefaultRegistry()
- : nameResolverFactory,
+ firstNonNull(nameResolverFactory, NameResolverRegistry.getDefaultRegistry()),
getNameResolverParams(),
- loadBalancerFactory == null ? SimpleLoadBalancerFactory.getInstance()
- : loadBalancerFactory,
- transportFactory, executor, userAgent, interceptors);
+ firstNonNull(loadBalancerFactory, SimpleLoadBalancerFactory.getInstance()),
+ transportFactory,
+ firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()),
+ firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()),
+ executor, userAgent, interceptors);
}
/**
@@ -186,7 +212,7 @@
/**
* Subclasses can override this method to provide additional parameters to {@link
* NameResolver.Factory#newNameResolver}. The default implementation returns {@link
- * Attributes.EMPTY}.
+ * Attributes#EMPTY}.
*/
protected Attributes getNameResolverParams() {
return Attributes.EMPTY;
diff --git a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
index 6657734..95df85e 100644
--- a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
@@ -34,7 +34,9 @@
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
+import io.grpc.CompressorRegistry;
import io.grpc.Context;
+import io.grpc.DecompressorRegistry;
import io.grpc.HandlerRegistry;
import io.grpc.Internal;
import io.grpc.MutableHandlerRegistry;
@@ -58,6 +60,12 @@
@Nullable
private Executor executor;
+ @Nullable
+ private DecompressorRegistry decompressorRegistry;
+
+ @Nullable
+ private CompressorRegistry compressorRegistry;
+
/**
* Constructs using a given handler registry.
*/
@@ -99,6 +107,26 @@
}
@Override
+ public final T decompressorRegistry(DecompressorRegistry registry) {
+ decompressorRegistry = registry;
+ return thisT();
+ }
+
+ protected final DecompressorRegistry decompressorRegistry() {
+ return decompressorRegistry;
+ }
+
+ @Override
+ public final T compressorRegistry(CompressorRegistry registry) {
+ compressorRegistry = registry;
+ return thisT();
+ }
+
+ protected final CompressorRegistry compressorRegistry() {
+ return compressorRegistry;
+ }
+
+ @Override
public ServerImpl build() {
io.grpc.internal.Server transportServer = buildTransportServer();
return new ServerImpl(executor, registry, transportServer, Context.ROOT);
diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java
index 505ed54..bdd169d 100644
--- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java
+++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java
@@ -31,6 +31,7 @@
package io.grpc.internal;
+import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Iterables.addAll;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITER;
@@ -338,6 +339,12 @@
}
@Override
+ public void setMessageCompression(boolean enabled) {
+ checkState(stream != null, "Not started");
+ stream.setMessageCompression(enabled);
+ }
+
+ @Override
public boolean isReady() {
return stream.isReady();
}
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
index 24212f3..b473e56 100644
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
+++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
@@ -108,9 +108,8 @@
*/
private final Set<String> knownAcceptEncodingRegistry =
Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
- private final DecompressorRegistry decompressorRegistry =
- DecompressorRegistry.getDefaultInstance();
- private final CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance();
+ private final DecompressorRegistry decompressorRegistry;
+ private final CompressorRegistry compressorRegistry;
/**
* Executor that runs deadline timers for requests.
@@ -154,9 +153,10 @@
ManagedChannelImpl(String target, BackoffPolicy.Provider backoffPolicyProvider,
NameResolver.Factory nameResolverFactory, Attributes nameResolverParams,
- LoadBalancer.Factory loadBalancerFactory,
- ClientTransportFactory transportFactory, @Nullable Executor executor,
- @Nullable String userAgent, List<ClientInterceptor> interceptors) {
+ LoadBalancer.Factory loadBalancerFactory, ClientTransportFactory transportFactory,
+ DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry,
+ @Nullable Executor executor, @Nullable String userAgent,
+ List<ClientInterceptor> interceptors) {
if (executor == null) {
usingSharedExecutor = true;
this.executor = SharedResourceHolder.get(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
@@ -171,6 +171,8 @@
this.userAgent = userAgent;
this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors);
scheduledExecutor = SharedResourceHolder.get(TIMER_SERVICE);
+ this.decompressorRegistry = decompressorRegistry;
+ this.compressorRegistry = compressorRegistry;
this.nameResolver.start(new NameResolver.Listener() {
@Override
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
index 32208f2..ef3fc93 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
@@ -134,7 +134,8 @@
NameResolver.Factory nameResolverFactory, List<ClientInterceptor> interceptors) {
return new ManagedChannelImpl(target, new FakeBackoffPolicyProvider(),
nameResolverFactory, NAME_RESOLVER_PARAMS, loadBalancerFactory,
- mockTransportFactory, executor, null, interceptors);
+ mockTransportFactory, DecompressorRegistry.getDefaultInstance(),
+ CompressorRegistry.getDefaultInstance(), executor, null, interceptors);
}
@Before
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java
index 171e5ea..bc606e9 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java
@@ -48,6 +48,8 @@
import io.grpc.Attributes;
import io.grpc.ClientInterceptor;
+import io.grpc.CompressorRegistry;
+import io.grpc.DecompressorRegistry;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.NameResolver;
@@ -119,7 +121,9 @@
channel = new ManagedChannelImpl("fake://target", mockBackoffPolicyProvider,
nameResolverFactory, Attributes.EMPTY, mockLoadBalancerFactory,
- mockTransportFactory, executor, null, Collections.<ClientInterceptor>emptyList());
+ mockTransportFactory, DecompressorRegistry.getDefaultInstance(),
+ CompressorRegistry.getDefaultInstance(), executor, null,
+ Collections.<ClientInterceptor>emptyList());
ArgumentCaptor<TransportManager> tmCaptor = ArgumentCaptor.forClass(TransportManager.class);
verify(mockLoadBalancerFactory).newLoadBalancer(anyString(), tmCaptor.capture());
diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java
index f2e9eb7..f422dc2 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServer.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServer.java
@@ -35,6 +35,8 @@
import static io.netty.channel.ChannelOption.SO_BACKLOG;
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
+import io.grpc.CompressorRegistry;
+import io.grpc.DecompressorRegistry;
import io.grpc.internal.Server;
import io.grpc.internal.ServerListener;
import io.grpc.internal.SharedResourceHolder;
@@ -72,6 +74,8 @@
private EventLoopGroup workerGroup;
private ServerListener listener;
private Channel channel;
+ private final DecompressorRegistry decompressorRegistry;
+ private final CompressorRegistry compressorRegistry;
private final int flowControlWindow;
private final int maxMessageSize;
private final int maxHeaderListSize;
@@ -79,7 +83,8 @@
NettyServer(SocketAddress address, Class<? extends ServerChannel> channelType,
@Nullable EventLoopGroup bossGroup, @Nullable EventLoopGroup workerGroup,
- ProtocolNegotiator protocolNegotiator, int maxStreamsPerConnection,
+ ProtocolNegotiator protocolNegotiator, DecompressorRegistry decompressorRegistry,
+ CompressorRegistry compressorRegistry, int maxStreamsPerConnection,
int flowControlWindow, int maxMessageSize, int maxHeaderListSize) {
this.address = address;
this.channelType = checkNotNull(channelType, "channelType");
@@ -92,6 +97,8 @@
this.flowControlWindow = flowControlWindow;
this.maxMessageSize = maxMessageSize;
this.maxHeaderListSize = maxHeaderListSize;
+ this.decompressorRegistry = checkNotNull(decompressorRegistry, "decompressorRegistry");
+ this.compressorRegistry = checkNotNull(compressorRegistry, "compressorRegistry");
}
@Override
@@ -113,13 +120,15 @@
public void initChannel(Channel ch) throws Exception {
eventLoopReferenceCounter.retain();
ch.closeFuture().addListener(new ChannelFutureListener() {
+ @Override
public void operationComplete(ChannelFuture future) {
eventLoopReferenceCounter.release();
}
});
NettyServerTransport transport
- = new NettyServerTransport(ch, protocolNegotiator, maxStreamsPerConnection,
- flowControlWindow, maxMessageSize, maxHeaderListSize);
+ = new NettyServerTransport(ch, protocolNegotiator, decompressorRegistry,
+ compressorRegistry, maxStreamsPerConnection, flowControlWindow, maxMessageSize,
+ maxHeaderListSize);
transport.start(listener.transportCreated(transport));
}
});
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
index d4aca99..449f115 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
@@ -31,11 +31,14 @@
package io.grpc.netty;
+import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
import com.google.common.base.Preconditions;
+import io.grpc.CompressorRegistry;
+import io.grpc.DecompressorRegistry;
import io.grpc.ExperimentalApi;
import io.grpc.HandlerRegistry;
import io.grpc.Internal;
@@ -242,7 +245,10 @@
ProtocolNegotiators.serverPlaintext();
}
return new NettyServer(address, channelType, bossEventLoopGroup,
- workerEventLoopGroup, negotiator, maxConcurrentCallsPerConnection, flowControlWindow,
+ workerEventLoopGroup, negotiator,
+ firstNonNull(decompressorRegistry(), DecompressorRegistry.getDefaultInstance()),
+ firstNonNull(compressorRegistry(), CompressorRegistry.getDefaultInstance()),
+ maxConcurrentCallsPerConnection, flowControlWindow,
maxMessageSize, maxHeaderListSize);
}
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
index 06b5a3f..01d3eb9 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
@@ -32,6 +32,7 @@
package io.grpc.netty;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.netty.Utils.CONTENT_TYPE_GRPC;
import static io.grpc.netty.Utils.CONTENT_TYPE_HEADER;
import static io.grpc.netty.Utils.HTTP_METHOD;
@@ -42,6 +43,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import io.grpc.CompressorRegistry;
+import io.grpc.DecompressorRegistry;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.internal.GrpcUtil;
@@ -94,6 +97,8 @@
private static final Status GOAWAY_STATUS = Status.UNAVAILABLE;
+ private final DecompressorRegistry decompressorRegistry;
+ private final CompressorRegistry compressorRegistry;
private final Http2Connection.PropertyKey streamKey;
private final ServerTransportListener transportListener;
private final int maxMessageSize;
@@ -102,6 +107,8 @@
private WriteQueue serverWriteQueue;
static NettyServerHandler newHandler(ServerTransportListener transportListener,
+ DecompressorRegistry decompressorRegistry,
+ CompressorRegistry compressorRegistry,
int maxStreams,
int flowControlWindow,
int maxHeaderListSize,
@@ -114,13 +121,15 @@
new DefaultHttp2FrameReader(headersDecoder), frameLogger);
Http2FrameWriter frameWriter =
new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger);
- return newHandler(frameReader, frameWriter, transportListener, maxStreams, flowControlWindow,
- maxMessageSize);
+ return newHandler(frameReader, frameWriter, transportListener, decompressorRegistry,
+ compressorRegistry, maxStreams, flowControlWindow, maxMessageSize);
}
@VisibleForTesting
static NettyServerHandler newHandler(Http2FrameReader frameReader, Http2FrameWriter frameWriter,
ServerTransportListener transportListener,
+ DecompressorRegistry decompressorRegistry,
+ CompressorRegistry compressorRegistry,
int maxStreams,
int flowControlWindow,
int maxMessageSize) {
@@ -142,19 +151,24 @@
settings.initialWindowSize(flowControlWindow);
settings.maxConcurrentStreams(maxStreams);
- return new NettyServerHandler(transportListener, decoder, encoder, settings, maxMessageSize);
+ return new NettyServerHandler(transportListener, decoder, encoder, settings,
+ decompressorRegistry, compressorRegistry, maxMessageSize);
}
private NettyServerHandler(ServerTransportListener transportListener,
Http2ConnectionDecoder decoder,
Http2ConnectionEncoder encoder, Http2Settings settings,
+ DecompressorRegistry decompressorRegistry,
+ CompressorRegistry compressorRegistry,
int maxMessageSize) {
super(decoder, encoder, settings);
checkArgument(maxMessageSize >= 0, "maxMessageSize must be >= 0");
this.maxMessageSize = maxMessageSize;
+ this.decompressorRegistry = checkNotNull(decompressorRegistry, "decompressorRegistry");
+ this.compressorRegistry = checkNotNull(compressorRegistry, "compressorRegistry");
streamKey = encoder.connection().newKey();
- this.transportListener = Preconditions.checkNotNull(transportListener, "transportListener");
+ this.transportListener = checkNotNull(transportListener, "transportListener");
// Set the frame listener on the decoder.
decoder().frameListener(new FrameListener());
@@ -192,6 +206,11 @@
NettyServerStream stream = new NettyServerStream(ctx.channel(), http2Stream, this,
maxMessageSize);
+ // These must be called before inboundHeadersReceived, because the framers depend on knowing
+ // the compression algorithms available before negotiation.
+ stream.setDecompressionRegistry(decompressorRegistry);
+ stream.setCompressionRegistry(compressorRegistry);
+
Metadata metadata = Utils.convertHeaders(headers);
stream.inboundHeadersReceived(metadata);
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java
index e8e5e39..7275c3c 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java
@@ -31,8 +31,12 @@
package io.grpc.netty;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import com.google.common.base.Preconditions;
+import io.grpc.CompressorRegistry;
+import io.grpc.DecompressorRegistry;
import io.grpc.internal.ServerTransport;
import io.grpc.internal.ServerTransportListener;
import io.netty.channel.Channel;
@@ -51,6 +55,8 @@
private final Channel channel;
private final ProtocolNegotiator protocolNegotiator;
+ private final DecompressorRegistry decompressorRegistry;
+ private final CompressorRegistry compressorRegistry;
private final int maxStreams;
private ServerTransportListener listener;
private boolean terminated;
@@ -58,14 +64,18 @@
private final int maxMessageSize;
private final int maxHeaderListSize;
- NettyServerTransport(Channel channel, ProtocolNegotiator protocolNegotiator, int maxStreams,
- int flowControlWindow, int maxMessageSize, int maxHeaderListSize) {
+ NettyServerTransport(Channel channel, ProtocolNegotiator protocolNegotiator,
+ DecompressorRegistry decompressorRegistry,
+ CompressorRegistry compressorRegistry, int maxStreams, int flowControlWindow,
+ int maxMessageSize, int maxHeaderListSize) {
this.channel = Preconditions.checkNotNull(channel, "channel");
this.protocolNegotiator = Preconditions.checkNotNull(protocolNegotiator, "protocolNegotiator");
this.maxStreams = maxStreams;
this.flowControlWindow = flowControlWindow;
this.maxMessageSize = maxMessageSize;
this.maxHeaderListSize = maxHeaderListSize;
+ this.decompressorRegistry = checkNotNull(decompressorRegistry, "decompressorRegistry");
+ this.compressorRegistry = checkNotNull(compressorRegistry, "compressorRegistry");
}
public void start(ServerTransportListener listener) {
@@ -115,7 +125,7 @@
* Creates the Netty handler to be used in the channel pipeline.
*/
private NettyServerHandler createHandler(ServerTransportListener transportListener) {
- return NettyServerHandler.newHandler(transportListener, maxStreams, flowControlWindow,
- maxHeaderListSize, maxMessageSize);
+ return NettyServerHandler.newHandler(transportListener, decompressorRegistry,
+ compressorRegistry, maxStreams, flowControlWindow, maxHeaderListSize, maxMessageSize);
}
}
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
index fb79187..c2e171f 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
@@ -43,6 +43,8 @@
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.SettableFuture;
+import io.grpc.CompressorRegistry;
+import io.grpc.DecompressorRegistry;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.Marshaller;
@@ -312,7 +314,8 @@
.ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE).build();
ProtocolNegotiator negotiator = ProtocolNegotiators.serverTls(serverContext);
server = new NettyServer(address, NioServerSocketChannel.class,
- group, group, negotiator, maxStreamsPerConnection,
+ group, group, negotiator, DecompressorRegistry.getDefaultInstance(),
+ CompressorRegistry.getDefaultInstance(), maxStreamsPerConnection,
DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, maxHeaderListSize);
server.start(serverListener);
}
diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java
index 52ec3fd..9e4c0bb 100644
--- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java
@@ -54,6 +54,8 @@
import com.google.common.io.ByteStreams;
+import io.grpc.CompressorRegistry;
+import io.grpc.DecompressorRegistry;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.Status.Code;
@@ -345,6 +347,7 @@
@Override
protected NettyServerHandler newHandler() {
return NettyServerHandler.newHandler(frameReader(), frameWriter(), transportListener,
+ DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(),
maxConcurrentStreams, flowControlWindow, DEFAULT_MAX_MESSAGE_SIZE);
}