Expose compression on ClientCall and Server Call
diff --git a/core/src/main/java/io/grpc/ClientCall.java b/core/src/main/java/io/grpc/ClientCall.java
index fadd54e..f6111a7 100644
--- a/core/src/main/java/io/grpc/ClientCall.java
+++ b/core/src/main/java/io/grpc/ClientCall.java
@@ -183,4 +183,13 @@
public boolean isReady() {
return true;
}
+
+ /**
+ * Enables per-message compression, if an encoding type has been negotiated. If no message
+ * encoding has been negotiated, this is a no-op.
+ */
+ @ExperimentalApi
+ public void setMessageCompression(boolean enabled) {
+ // noop
+ }
}
diff --git a/core/src/main/java/io/grpc/DecompressorRegistry.java b/core/src/main/java/io/grpc/DecompressorRegistry.java
index 4ed146e..a11fbf1 100644
--- a/core/src/main/java/io/grpc/DecompressorRegistry.java
+++ b/core/src/main/java/io/grpc/DecompressorRegistry.java
@@ -54,7 +54,7 @@
public final class DecompressorRegistry {
private static final DecompressorRegistry DEFAULT_INSTANCE = new DecompressorRegistry(
- new DecompressorInfo(new Codec.Gzip(), false),
+ new DecompressorInfo(new Codec.Gzip(), true),
new DecompressorInfo(Codec.Identity.NONE, false));
public static DecompressorRegistry getDefaultInstance() {
diff --git a/core/src/main/java/io/grpc/ForwardingClientCall.java b/core/src/main/java/io/grpc/ForwardingClientCall.java
index 8d85a72..5d6f1fe 100644
--- a/core/src/main/java/io/grpc/ForwardingClientCall.java
+++ b/core/src/main/java/io/grpc/ForwardingClientCall.java
@@ -66,6 +66,11 @@
}
@Override
+ public void setMessageCompression(boolean enabled) {
+ delegate().setMessageCompression(enabled);
+ }
+
+ @Override
public boolean isReady() {
return delegate().isReady();
}
diff --git a/core/src/main/java/io/grpc/ManagedChannelBuilder.java b/core/src/main/java/io/grpc/ManagedChannelBuilder.java
index 5d8ca36..3d74c9e 100644
--- a/core/src/main/java/io/grpc/ManagedChannelBuilder.java
+++ b/core/src/main/java/io/grpc/ManagedChannelBuilder.java
@@ -164,6 +164,22 @@
public abstract T loadBalancerFactory(LoadBalancer.Factory loadBalancerFactory);
/**
+ * Set the decompression registry for use in the channel. This is an advanced API call and
+ * shouldn't be used unless you are using custom message encoding. The default supported
+ * decompressors are in {@code DecompressorRegistry.getDefaultInstance}.
+ */
+ @ExperimentalApi
+ public abstract T decompressorRegistry(DecompressorRegistry registry);
+
+ /**
+ * Set the compression registry for use in the channel. This is an advanced API call and
+ * shouldn't be used unless you are using custom message encoding. The default supported
+ * compressors are in {@code CompressorRegistry.getDefaultInstance}.
+ */
+ @ExperimentalApi
+ public abstract T compressorRegistry(CompressorRegistry registry);
+
+ /**
* Builds a channel using the given parameters.
*/
public abstract ManagedChannel build();
diff --git a/core/src/main/java/io/grpc/ServerBuilder.java b/core/src/main/java/io/grpc/ServerBuilder.java
index 047dfa0..6af7f53 100644
--- a/core/src/main/java/io/grpc/ServerBuilder.java
+++ b/core/src/main/java/io/grpc/ServerBuilder.java
@@ -88,6 +88,22 @@
public abstract T useTransportSecurity(File certChain, File privateKey);
/**
+ * Set the decompression registry for use in the channel. This is an advanced API call and
+ * shouldn't be used unless you are using custom message encoding. The default supported
+ * decompressors are in {@code DecompressorRegistry.getDefaultInstance}.
+ */
+ @ExperimentalApi
+ public abstract T decompressorRegistry(DecompressorRegistry registry);
+
+ /**
+ * Set the compression registry for use in the channel. This is an advanced API call and
+ * shouldn't be used unless you are using custom message encoding. The default supported
+ * compressors are in {@code CompressorRegistry.getDefaultInstance}.
+ */
+ @ExperimentalApi
+ public abstract T compressorRegistry(CompressorRegistry registry);
+
+ /**
* Builds a server using the given parameters.
*
* <p>The returned service will not been started or be bound a port. You will need to start it
diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
index ea79c4e..a610d6c 100644
--- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
@@ -31,11 +31,16 @@
package io.grpc.internal;
+import static com.google.common.base.MoreObjects.firstNonNull;
+
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Attributes;
import io.grpc.ClientInterceptor;
+import io.grpc.CompressorRegistry;
+import io.grpc.DecompressorRegistry;
+import io.grpc.ExperimentalApi;
import io.grpc.LoadBalancer;
import io.grpc.ManagedChannelBuilder;
import io.grpc.NameResolver;
@@ -82,6 +87,12 @@
@Nullable
private LoadBalancer.Factory loadBalancerFactory;
+ @Nullable
+ private DecompressorRegistry decompressorRegistry;
+
+ @Nullable
+ private CompressorRegistry compressorRegistry;
+
protected AbstractManagedChannelImplBuilder(String target) {
this.target = Preconditions.checkNotNull(target);
this.directServerAddress = null;
@@ -133,6 +144,20 @@
return thisT();
}
+ @Override
+ @ExperimentalApi
+ public final T decompressorRegistry(DecompressorRegistry registry) {
+ this.decompressorRegistry = registry;
+ return thisT();
+ }
+
+ @Override
+ @ExperimentalApi
+ public final T compressorRegistry(CompressorRegistry registry) {
+ this.compressorRegistry = registry;
+ return thisT();
+ }
+
private T thisT() {
@SuppressWarnings("unchecked")
T thisT = (T) this;
@@ -168,12 +193,13 @@
target,
// TODO(carl-mastrangelo): Allow clients to pass this in
new ExponentialBackoffPolicy.Provider(),
- nameResolverFactory == null ? NameResolverRegistry.getDefaultRegistry()
- : nameResolverFactory,
+ firstNonNull(nameResolverFactory, NameResolverRegistry.getDefaultRegistry()),
getNameResolverParams(),
- loadBalancerFactory == null ? SimpleLoadBalancerFactory.getInstance()
- : loadBalancerFactory,
- transportFactory, executor, userAgent, interceptors);
+ firstNonNull(loadBalancerFactory, SimpleLoadBalancerFactory.getInstance()),
+ transportFactory,
+ firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()),
+ firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()),
+ executor, userAgent, interceptors);
}
/**
@@ -186,7 +212,7 @@
/**
* Subclasses can override this method to provide additional parameters to {@link
* NameResolver.Factory#newNameResolver}. The default implementation returns {@link
- * Attributes.EMPTY}.
+ * Attributes#EMPTY}.
*/
protected Attributes getNameResolverParams() {
return Attributes.EMPTY;
diff --git a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
index 6657734..95df85e 100644
--- a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
@@ -34,7 +34,9 @@
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
+import io.grpc.CompressorRegistry;
import io.grpc.Context;
+import io.grpc.DecompressorRegistry;
import io.grpc.HandlerRegistry;
import io.grpc.Internal;
import io.grpc.MutableHandlerRegistry;
@@ -58,6 +60,12 @@
@Nullable
private Executor executor;
+ @Nullable
+ private DecompressorRegistry decompressorRegistry;
+
+ @Nullable
+ private CompressorRegistry compressorRegistry;
+
/**
* Constructs using a given handler registry.
*/
@@ -99,6 +107,26 @@
}
@Override
+ public final T decompressorRegistry(DecompressorRegistry registry) {
+ decompressorRegistry = registry;
+ return thisT();
+ }
+
+ protected final DecompressorRegistry decompressorRegistry() {
+ return decompressorRegistry;
+ }
+
+ @Override
+ public final T compressorRegistry(CompressorRegistry registry) {
+ compressorRegistry = registry;
+ return thisT();
+ }
+
+ protected final CompressorRegistry compressorRegistry() {
+ return compressorRegistry;
+ }
+
+ @Override
public ServerImpl build() {
io.grpc.internal.Server transportServer = buildTransportServer();
return new ServerImpl(executor, registry, transportServer, Context.ROOT);
diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java
index 505ed54..bdd169d 100644
--- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java
+++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java
@@ -31,6 +31,7 @@
package io.grpc.internal;
+import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Iterables.addAll;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITER;
@@ -338,6 +339,12 @@
}
@Override
+ public void setMessageCompression(boolean enabled) {
+ checkState(stream != null, "Not started");
+ stream.setMessageCompression(enabled);
+ }
+
+ @Override
public boolean isReady() {
return stream.isReady();
}
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
index 24212f3..b473e56 100644
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
+++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
@@ -108,9 +108,8 @@
*/
private final Set<String> knownAcceptEncodingRegistry =
Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
- private final DecompressorRegistry decompressorRegistry =
- DecompressorRegistry.getDefaultInstance();
- private final CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance();
+ private final DecompressorRegistry decompressorRegistry;
+ private final CompressorRegistry compressorRegistry;
/**
* Executor that runs deadline timers for requests.
@@ -154,9 +153,10 @@
ManagedChannelImpl(String target, BackoffPolicy.Provider backoffPolicyProvider,
NameResolver.Factory nameResolverFactory, Attributes nameResolverParams,
- LoadBalancer.Factory loadBalancerFactory,
- ClientTransportFactory transportFactory, @Nullable Executor executor,
- @Nullable String userAgent, List<ClientInterceptor> interceptors) {
+ LoadBalancer.Factory loadBalancerFactory, ClientTransportFactory transportFactory,
+ DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry,
+ @Nullable Executor executor, @Nullable String userAgent,
+ List<ClientInterceptor> interceptors) {
if (executor == null) {
usingSharedExecutor = true;
this.executor = SharedResourceHolder.get(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
@@ -171,6 +171,8 @@
this.userAgent = userAgent;
this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors);
scheduledExecutor = SharedResourceHolder.get(TIMER_SERVICE);
+ this.decompressorRegistry = decompressorRegistry;
+ this.compressorRegistry = compressorRegistry;
this.nameResolver.start(new NameResolver.Listener() {
@Override
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
index 32208f2..ef3fc93 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
@@ -134,7 +134,8 @@
NameResolver.Factory nameResolverFactory, List<ClientInterceptor> interceptors) {
return new ManagedChannelImpl(target, new FakeBackoffPolicyProvider(),
nameResolverFactory, NAME_RESOLVER_PARAMS, loadBalancerFactory,
- mockTransportFactory, executor, null, interceptors);
+ mockTransportFactory, DecompressorRegistry.getDefaultInstance(),
+ CompressorRegistry.getDefaultInstance(), executor, null, interceptors);
}
@Before
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java
index 171e5ea..bc606e9 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java
@@ -48,6 +48,8 @@
import io.grpc.Attributes;
import io.grpc.ClientInterceptor;
+import io.grpc.CompressorRegistry;
+import io.grpc.DecompressorRegistry;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.NameResolver;
@@ -119,7 +121,9 @@
channel = new ManagedChannelImpl("fake://target", mockBackoffPolicyProvider,
nameResolverFactory, Attributes.EMPTY, mockLoadBalancerFactory,
- mockTransportFactory, executor, null, Collections.<ClientInterceptor>emptyList());
+ mockTransportFactory, DecompressorRegistry.getDefaultInstance(),
+ CompressorRegistry.getDefaultInstance(), executor, null,
+ Collections.<ClientInterceptor>emptyList());
ArgumentCaptor<TransportManager> tmCaptor = ArgumentCaptor.forClass(TransportManager.class);
verify(mockLoadBalancerFactory).newLoadBalancer(anyString(), tmCaptor.capture());