Optimize blocking calls to avoid app thread pool
This reduces the necessary number of threads in the application executor
and provides a small improvement in latency (~15μs, which is normally in
the noise, but would be a 5% improvement).
Benchmark (direct) (transport) Mode Cnt Score Error Units
Before:
TransportBenchmark.unaryCall1024 true INPROCESS avgt 10 1566.168 ± 13.677 ns/op
TransportBenchmark.unaryCall1024 false INPROCESS avgt 10 35769.532 ± 2358.967 ns/op
After:
TransportBenchmark.unaryCall1024 true INPROCESS avgt 10 1813.778 ± 19.995 ns/op
TransportBenchmark.unaryCall1024 false INPROCESS avgt 10 18568.223 ± 1679.306 ns/op
The benchmark results are exactly what we would expect, assuming that
half of the benefit of direct is on server and half on client:
1566 + (35769 - 1566) / 2 = 18668 ns --vs-- 18568 ns
It is expected that direct=true would get worse, because
SerializingExecutor is now used instead of
SerializeReentrantCallsDirectExecutor plus the additional cost of
ThreadlessExecutor.
In the future we could try to detect the ThreadlessExecutor and ellide
Serializ*Executor completely (as is possible for any single-threaded
executor). We could also optimize the queue used in ThreadlessExecutor
to be single-producer, single-consumer. I don't expect to do those
optimizations soon, however.
diff --git a/benchmarks/src/generated/main/grpc/io/grpc/testing/TestServiceGrpc.java b/benchmarks/src/generated/main/grpc/io/grpc/testing/TestServiceGrpc.java
index 170a631..45881e3 100644
--- a/benchmarks/src/generated/main/grpc/io/grpc/testing/TestServiceGrpc.java
+++ b/benchmarks/src/generated/main/grpc/io/grpc/testing/TestServiceGrpc.java
@@ -126,7 +126,7 @@
@java.lang.Override
public io.grpc.testing.SimpleResponse unaryCall(io.grpc.testing.SimpleRequest request) {
return blockingUnaryCall(
- getChannel().newCall(METHOD_UNARY_CALL, getCallOptions()), request);
+ getChannel(), METHOD_UNARY_CALL, getCallOptions(), request);
}
}
diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/TransportBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/TransportBenchmark.java
index 6de0add..8381bf5 100644
--- a/benchmarks/src/jmh/java/io/grpc/benchmarks/TransportBenchmark.java
+++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/TransportBenchmark.java
@@ -129,6 +129,7 @@
if (direct) {
serverBuilder.directExecutor();
+ // Because blocking stubs avoid the executor, this doesn't do much.
channelBuilder.directExecutor();
}
diff --git a/compiler/src/java_plugin/cpp/java_generator.cpp b/compiler/src/java_plugin/cpp/java_generator.cpp
index 49a1f54..cdb17c0 100644
--- a/compiler/src/java_plugin/cpp/java_generator.cpp
+++ b/compiler/src/java_plugin/cpp/java_generator.cpp
@@ -407,7 +407,7 @@
p->Print(
*vars,
"return $calls_method$(\n"
- " getChannel().newCall($method_field_name$, getCallOptions()), $params$);\n");
+ " getChannel(), $method_field_name$, getCallOptions(), $params$);\n");
break;
case ASYNC_CALL:
if (server_streaming) {
diff --git a/compiler/src/test/golden/TestService.java.txt b/compiler/src/test/golden/TestService.java.txt
index 77d5bb2..904963e 100644
--- a/compiler/src/test/golden/TestService.java.txt
+++ b/compiler/src/test/golden/TestService.java.txt
@@ -186,14 +186,14 @@
@java.lang.Override
public io.grpc.testing.integration.Test.SimpleResponse unaryCall(io.grpc.testing.integration.Test.SimpleRequest request) {
return blockingUnaryCall(
- getChannel().newCall(METHOD_UNARY_CALL, getCallOptions()), request);
+ getChannel(), METHOD_UNARY_CALL, getCallOptions(), request);
}
@java.lang.Override
public java.util.Iterator<io.grpc.testing.integration.Test.StreamingOutputCallResponse> streamingOutputCall(
io.grpc.testing.integration.Test.StreamingOutputCallRequest request) {
return blockingServerStreamingCall(
- getChannel().newCall(METHOD_STREAMING_OUTPUT_CALL, getCallOptions()), request);
+ getChannel(), METHOD_STREAMING_OUTPUT_CALL, getCallOptions(), request);
}
}
diff --git a/compiler/src/test/golden/TestServiceNano.java.txt b/compiler/src/test/golden/TestServiceNano.java.txt
index f8c1ff3..4d8d1ba 100644
--- a/compiler/src/test/golden/TestServiceNano.java.txt
+++ b/compiler/src/test/golden/TestServiceNano.java.txt
@@ -264,14 +264,14 @@
@java.lang.Override
public io.grpc.testing.integration.nano.Test.SimpleResponse unaryCall(io.grpc.testing.integration.nano.Test.SimpleRequest request) {
return blockingUnaryCall(
- getChannel().newCall(METHOD_UNARY_CALL, getCallOptions()), request);
+ getChannel(), METHOD_UNARY_CALL, getCallOptions(), request);
}
@java.lang.Override
public java.util.Iterator<io.grpc.testing.integration.nano.Test.StreamingOutputCallResponse> streamingOutputCall(
io.grpc.testing.integration.nano.Test.StreamingOutputCallRequest request) {
return blockingServerStreamingCall(
- getChannel().newCall(METHOD_STREAMING_OUTPUT_CALL, getCallOptions()), request);
+ getChannel(), METHOD_STREAMING_OUTPUT_CALL, getCallOptions(), request);
}
}
diff --git a/examples/src/generated/main/grpc/io/grpc/examples/helloworld/GreeterGrpc.java b/examples/src/generated/main/grpc/io/grpc/examples/helloworld/GreeterGrpc.java
index ac400a5..d423b2d 100644
--- a/examples/src/generated/main/grpc/io/grpc/examples/helloworld/GreeterGrpc.java
+++ b/examples/src/generated/main/grpc/io/grpc/examples/helloworld/GreeterGrpc.java
@@ -107,7 +107,7 @@
@java.lang.Override
public io.grpc.examples.helloworld.HelloResponse sayHello(io.grpc.examples.helloworld.HelloRequest request) {
return blockingUnaryCall(
- getChannel().newCall(METHOD_SAY_HELLO, getCallOptions()), request);
+ getChannel(), METHOD_SAY_HELLO, getCallOptions(), request);
}
}
diff --git a/examples/src/generated/main/grpc/io/grpc/examples/routeguide/RouteGuideGrpc.java b/examples/src/generated/main/grpc/io/grpc/examples/routeguide/RouteGuideGrpc.java
index 6fe7108..6f732d1 100644
--- a/examples/src/generated/main/grpc/io/grpc/examples/routeguide/RouteGuideGrpc.java
+++ b/examples/src/generated/main/grpc/io/grpc/examples/routeguide/RouteGuideGrpc.java
@@ -167,14 +167,14 @@
@java.lang.Override
public io.grpc.examples.routeguide.Feature getFeature(io.grpc.examples.routeguide.Point request) {
return blockingUnaryCall(
- getChannel().newCall(METHOD_GET_FEATURE, getCallOptions()), request);
+ getChannel(), METHOD_GET_FEATURE, getCallOptions(), request);
}
@java.lang.Override
public java.util.Iterator<io.grpc.examples.routeguide.Feature> listFeatures(
io.grpc.examples.routeguide.Rectangle request) {
return blockingServerStreamingCall(
- getChannel().newCall(METHOD_LIST_FEATURES, getCallOptions()), request);
+ getChannel(), METHOD_LIST_FEATURES, getCallOptions(), request);
}
}
diff --git a/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/ReconnectServiceGrpc.java b/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/ReconnectServiceGrpc.java
index 6a573ad..d27309f 100644
--- a/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/ReconnectServiceGrpc.java
+++ b/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/ReconnectServiceGrpc.java
@@ -131,13 +131,13 @@
@java.lang.Override
public com.google.protobuf.EmptyProtos.Empty start(com.google.protobuf.EmptyProtos.Empty request) {
return blockingUnaryCall(
- getChannel().newCall(METHOD_START, getCallOptions()), request);
+ getChannel(), METHOD_START, getCallOptions(), request);
}
@java.lang.Override
public io.grpc.testing.integration.Messages.ReconnectInfo stop(com.google.protobuf.EmptyProtos.Empty request) {
return blockingUnaryCall(
- getChannel().newCall(METHOD_STOP, getCallOptions()), request);
+ getChannel(), METHOD_STOP, getCallOptions(), request);
}
}
diff --git a/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/TestServiceGrpc.java b/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/TestServiceGrpc.java
index c06c7b5..a0939a6 100644
--- a/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/TestServiceGrpc.java
+++ b/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/TestServiceGrpc.java
@@ -210,20 +210,20 @@
@java.lang.Override
public com.google.protobuf.EmptyProtos.Empty emptyCall(com.google.protobuf.EmptyProtos.Empty request) {
return blockingUnaryCall(
- getChannel().newCall(METHOD_EMPTY_CALL, getCallOptions()), request);
+ getChannel(), METHOD_EMPTY_CALL, getCallOptions(), request);
}
@java.lang.Override
public io.grpc.testing.integration.Messages.SimpleResponse unaryCall(io.grpc.testing.integration.Messages.SimpleRequest request) {
return blockingUnaryCall(
- getChannel().newCall(METHOD_UNARY_CALL, getCallOptions()), request);
+ getChannel(), METHOD_UNARY_CALL, getCallOptions(), request);
}
@java.lang.Override
public java.util.Iterator<io.grpc.testing.integration.Messages.StreamingOutputCallResponse> streamingOutputCall(
io.grpc.testing.integration.Messages.StreamingOutputCallRequest request) {
return blockingServerStreamingCall(
- getChannel().newCall(METHOD_STREAMING_OUTPUT_CALL, getCallOptions()), request);
+ getChannel(), METHOD_STREAMING_OUTPUT_CALL, getCallOptions(), request);
}
}
diff --git a/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/UnimplementedServiceGrpc.java b/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/UnimplementedServiceGrpc.java
index 2436c26..3cf6587 100644
--- a/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/UnimplementedServiceGrpc.java
+++ b/interop-testing/src/generated/main/grpc/io/grpc/testing/integration/UnimplementedServiceGrpc.java
@@ -107,7 +107,7 @@
@java.lang.Override
public com.google.protobuf.EmptyProtos.Empty unimplementedCall(com.google.protobuf.EmptyProtos.Empty request) {
return blockingUnaryCall(
- getChannel().newCall(METHOD_UNIMPLEMENTED_CALL, getCallOptions()), request);
+ getChannel(), METHOD_UNIMPLEMENTED_CALL, getCallOptions(), request);
}
}
diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java
index a3f3c81..73fa3f2 100644
--- a/stub/src/main/java/io/grpc/stub/ClientCalls.java
+++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java
@@ -36,8 +36,11 @@
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.util.Iterator;
@@ -45,7 +48,11 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import javax.annotation.Nullable;
@@ -55,6 +62,8 @@
* that the runtime can vary behavior without requiring regeneration of the stub.
*/
public class ClientCalls {
+ private static final Logger log = Logger.getLogger(ClientCalls.class.getName());
+
// Prevent instantiation
private ClientCalls() {}
@@ -111,6 +120,32 @@
}
/**
+ * Executes a unary call and blocks on the response.
+ *
+ * @return the single response message.
+ */
+ public static <ReqT, RespT> RespT blockingUnaryCall(
+ Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT param) {
+ ThreadlessExecutor executor = new ThreadlessExecutor();
+ ClientCall<ReqT, RespT> call = channel.newCall(method, callOptions.withExecutor(executor));
+ try {
+ ListenableFuture<RespT> responseFuture = futureUnaryCall(call, param);
+ while (!responseFuture.isDone()) {
+ try {
+ executor.waitAndDrain();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ return getUnchecked(responseFuture);
+ } catch (Throwable t) {
+ call.cancel();
+ throw Throwables.propagate(t);
+ }
+ }
+
+ /**
* Executes a server-streaming call returning a blocking {@link Iterator} over the
* response stream.
* @return an iterator over the response stream.
@@ -124,6 +159,22 @@
}
/**
+ * Executes a server-streaming call returning a blocking {@link Iterator} over the
+ * response stream.
+ *
+ * @return an iterator over the response stream.
+ */
+ // TODO(louiscryan): Not clear if we want to use this idiom for 'simple' stubs.
+ public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
+ Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT param) {
+ ThreadlessExecutor executor = new ThreadlessExecutor();
+ ClientCall<ReqT, RespT> call = channel.newCall(method, callOptions.withExecutor(executor));
+ BlockingResponseStream<RespT> result = new BlockingResponseStream<RespT>(call, executor);
+ asyncUnaryRequestCall(call, param, result.listener(), true);
+ return result;
+ }
+
+ /**
* Executes a unary call and returns a {@link ListenableFuture} to the response.
*
* @return a future for the single response message.
@@ -345,26 +396,48 @@
private final BlockingQueue<Object> buffer = new ArrayBlockingQueue<Object>(2);
private final ClientCall.Listener<T> listener = new QueuingListener();
private final ClientCall<?, T> call;
+ /** May be null. */
+ private final ThreadlessExecutor threadless;
// Only accessed when iterating.
private Object last;
private BlockingResponseStream(ClientCall<?, T> call) {
+ this(call, null);
+ }
+
+ private BlockingResponseStream(ClientCall<?, T> call, ThreadlessExecutor threadless) {
this.call = call;
+ this.threadless = threadless;
}
ClientCall.Listener<T> listener() {
return listener;
}
+ private Object waitForNext() throws InterruptedException {
+ if (threadless == null) {
+ return buffer.take();
+ } else {
+ Object next = buffer.poll();
+ while (next == null) {
+ threadless.waitAndDrain();
+ next = buffer.poll();
+ }
+ return next;
+ }
+ }
+
@Override
public boolean hasNext() {
- try {
- // Will block here indefinitely waiting for content. RPC timeouts defend against permanent
- // hangs here as the call will become closed.
- last = (last == null) ? buffer.take() : last;
- } catch (InterruptedException ie) {
- Thread.interrupted();
- throw new RuntimeException(ie);
+ if (last == null) {
+ try {
+ // Will block here indefinitely waiting for content. RPC timeouts defend against permanent
+ // hangs here as the call will become closed.
+ last = waitForNext();
+ } catch (InterruptedException ie) {
+ Thread.interrupted();
+ throw new RuntimeException(ie);
+ }
}
if (last instanceof Status) {
throw ((Status) last).asRuntimeException();
@@ -417,4 +490,28 @@
}
}
}
+
+ private static class ThreadlessExecutor implements Executor {
+ private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
+
+ /**
+ * Waits until there is a Runnable, then executes it and all queued Runnables after it.
+ */
+ public void waitAndDrain() throws InterruptedException {
+ Runnable runnable = queue.take();
+ while (runnable != null) {
+ try {
+ runnable.run();
+ } catch (Throwable t) {
+ log.log(Level.WARNING, "Runnable threw exception", t);
+ }
+ runnable = queue.poll();
+ }
+ }
+
+ @Override
+ public void execute(Runnable runnable) {
+ queue.add(runnable);
+ }
+ }
}