Add QPS Server and TLS support to QPS Client.
diff --git a/benchmarks/build.gradle b/benchmarks/build.gradle
index 10a843f..9bfc2fc 100644
--- a/benchmarks/build.gradle
+++ b/benchmarks/build.gradle
@@ -2,8 +2,8 @@
apply plugin: 'protobuf'
description = "gRPC Benchmarks"
+mainClassName = "io.grpc.benchmarks.qps.Main"
-mainClassName = "io.grpc.benchmarks.qps.Client"
buildscript {
repositories {
@@ -14,28 +14,32 @@
}
}
+configurations {
+ alpnboot
+}
+
dependencies {
compile project(':grpc-core'),
project(':grpc-netty'),
project(':grpc-okhttp'),
project(':grpc-stub'),
- project(':grpc-testing'),
+ project(':grpc-integration-testing'),
libraries.junit,
libraries.mockito,
libraries.hdrhistogram
+
+ // Determine the correct version of Jetty ALPN boot to use based
+ // on the Java version.
+ def alpnboot_prefix = 'org.mortbay.jetty.alpn:alpn-boot:'
+ def alpnboot_version = '8.1.2.v20141202'
+ if (JavaVersion.current().ordinal() < JavaVersion.VERSION_1_8.ordinal()) {
+ alpnboot_version = '7.1.2.v20141202'
+ }
+
+ alpnboot alpnboot_prefix + alpnboot_version
+
+ applicationDefaultJvmArgs = ["-Xbootclasspath/p:" + configurations.alpnboot.asPath]
}
protobufCodeGenPlugins = ["java_plugin:$rootDir/compiler/build/binaries/java_pluginExecutable/java_plugin"]
generateProto.dependsOn ':grpc-compiler:java_pluginExecutable'
-
-// Allow execution of test client and server.
-task execute(dependsOn: classes, type:JavaExec) {
- main = project.hasProperty('mainClass') ? project.mainClass : 'io.grpc.benchmarks.qps.Client'
- classpath = sourceSets.main.runtimeClasspath
- workingDir = project.rootDir
-
- // If appArgs were provided, set the program arguments.
- if (project.hasProperty("appArgs")) {
- args = Eval.me(appArgs)
- }
-}
diff --git a/benchmarks/src/main/java/io/grpc/benchmarks/qps/Main.java b/benchmarks/src/main/java/io/grpc/benchmarks/qps/Main.java
new file mode 100644
index 0000000..120e0ac
--- /dev/null
+++ b/benchmarks/src/main/java/io/grpc/benchmarks/qps/Main.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc.benchmarks.qps;
+
+import java.util.Arrays;
+
+public class Main {
+ public static void main(String... args) throws Exception {
+ if (args.length == 0) {
+ printUsage();
+ } else {
+ String arg = args[0].substring(2);
+ if ("server".equals(arg)) {
+ QpsServer server = new QpsServer();
+ server.run(Arrays.copyOfRange(args, 1, args.length));
+ } else if ("client".equals(arg)) {
+ QpsClient client = new QpsClient();
+ client.run(Arrays.copyOfRange(args, 1, args.length));
+ } else {
+ printUsage();
+ }
+ }
+ }
+
+ private static void printUsage() {
+ System.out.println("Usage: Pass one of the two as the FIRST argument."
+ + "\n"
+ + "\n --server Run the QPS server. No default."
+ + "\n --client Run the QPS client. No default."
+ );
+ }
+}
diff --git a/benchmarks/src/main/java/io/grpc/benchmarks/qps/Client.java b/benchmarks/src/main/java/io/grpc/benchmarks/qps/QpsClient.java
similarity index 83%
rename from benchmarks/src/main/java/io/grpc/benchmarks/qps/Client.java
rename to benchmarks/src/main/java/io/grpc/benchmarks/qps/QpsClient.java
index a1a2c1b..f3cced2 100644
--- a/benchmarks/src/main/java/io/grpc/benchmarks/qps/Client.java
+++ b/benchmarks/src/main/java/io/grpc/benchmarks/qps/QpsClient.java
@@ -35,8 +35,10 @@
import static grpc.testing.Qpstest.SimpleRequest;
import static grpc.testing.Qpstest.SimpleResponse;
import static java.lang.Math.max;
+import static io.grpc.testing.integration.Util.loadCert;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.MoreExecutors;
import grpc.testing.Qpstest.PayloadType;
import grpc.testing.TestServiceGrpc;
@@ -47,9 +49,14 @@
import io.grpc.transport.netty.NegotiationType;
import io.grpc.transport.netty.NettyChannelBuilder;
import io.grpc.transport.okhttp.OkHttpChannelBuilder;
+import io.netty.handler.ssl.SslContext;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.HistogramIterationValue;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
@@ -61,15 +68,13 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
-// TODO: Add OkHttp and Netty TLS Support
-
/**
* Runs lots of RPCs against a QPS Server to test for throughput and latency.
* It's a Java clone of the C version at
* https://github.com/grpc/grpc/blob/master/test/cpp/qps/client.cc
*/
-public class Client {
- private static final Logger log = Logger.getLogger(Client.class.getName());
+public class QpsClient {
+ private static final Logger log = Logger.getLogger(QpsClient.class.getName());
// Can record values between 1 ns and 1 min (60 BILLION NS)
private static final long HISTOGRAM_MAX_VALUE = 60000000000L;
@@ -84,13 +89,10 @@
private String serverHost = "127.0.0.1";
private int serverPort;
private boolean okhttp;
+ private boolean enableTls;
+ private boolean useTestCa;
- public static void main(String... args) throws Exception {
- Client c = new Client();
- c.run(args);
- }
-
- private void run(String[] args) throws Exception {
+ public void run(String[] args) throws Exception {
if (!parseArgs(args)) {
return;
}
@@ -143,14 +145,33 @@
}
}
- private Channel newChannel() {
+ private Channel newChannel() throws IOException {
if (okhttp) {
- return OkHttpChannelBuilder.forAddress(serverHost, serverPort).build();
- } else {
- return NettyChannelBuilder.forAddress(serverHost, serverPort)
- .negotiationType(NegotiationType.PLAINTEXT)
- .build();
+ if (enableTls) {
+ throw new IllegalStateException("TLS unsupported with okhttp");
+ }
+
+ return OkHttpChannelBuilder.forAddress(serverHost, serverPort)
+ // TODO(buchgr): Figure out what "server_threads" means in java
+ .executor(MoreExecutors.newDirectExecutorService())
+ .build();
}
+
+ SslContext context = null;
+ InetAddress address = InetAddress.getByName(serverHost);
+ NegotiationType negotiationType = enableTls ? NegotiationType.TLS : NegotiationType.PLAINTEXT;
+ if (enableTls && useTestCa) {
+ // Force the hostname to match the cert the server uses.
+ address = InetAddress.getByAddress("foo.test.google.fr", address.getAddress());
+ File cert = loadCert("ca.pem");
+ context = SslContext.newClientContext(cert);
+ }
+
+ return NettyChannelBuilder.forAddress(new InetSocketAddress(address, serverPort))
+ .executor(MoreExecutors.newDirectExecutorService())
+ .negotiationType(negotiationType)
+ .sslContext(context)
+ .build();
}
private boolean parseArgs(String[] args) {
@@ -165,11 +186,11 @@
}
String[] pair = arg.substring(2).split("=", 2);
- if (pair.length < 2) {
- continue;
- }
String key = pair[0];
- String value = pair[1];
+ String value = "";
+ if (pair.length == 2) {
+ value = pair[1];
+ }
if ("client_channels".equals(key)) {
clientChannels = max(Integer.parseInt(value), 1);
@@ -184,8 +205,14 @@
} else if ("server_port".equals(key)) {
serverPort = Integer.parseInt(value);
hasServerPort = true;
- } else if ("transport".equals(key)) {
- okhttp = "okhttp".equals(value);
+ } else if ("okhttp".equals(key)) {
+ okhttp = true;
+ } else if ("enable_tls".equals(key)) {
+ enableTls = true;
+ } else if ("use_testca".equals(key)) {
+ useTestCa = true;
+ } else {
+ System.err.println("Unrecognized argument '" + key + "'.");
}
}
@@ -204,7 +231,7 @@
}
private void printUsage() {
- Client c = new Client();
+ QpsClient c = new QpsClient();
System.out.println(
"Usage: [ARGS...]"
+ "\n"
@@ -214,7 +241,8 @@
+ "\n --client_threads=INT Number of client threads. Default " + c.clientThreads
+ "\n --num_rpcs=INT Number of RPCs per thread. Default " + c.numRpcs
+ "\n --payload_size=INT Payload size in bytes. Default " + c.payloadSize
- + "\n --transport=(okhttp|netty) The transport to use. Default netty"
+ + "\n --enable_tls Enable TLS. Default disabled."
+ + "\n --use_testca Use the provided test certificate for TLS."
);
}
diff --git a/benchmarks/src/main/java/io/grpc/benchmarks/qps/QpsServer.java b/benchmarks/src/main/java/io/grpc/benchmarks/qps/QpsServer.java
new file mode 100644
index 0000000..ac18ffa
--- /dev/null
+++ b/benchmarks/src/main/java/io/grpc/benchmarks/qps/QpsServer.java
@@ -0,0 +1,246 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc.benchmarks.qps;
+
+import static grpc.testing.Qpstest.StatsRequest;
+import static grpc.testing.Qpstest.ServerStats;
+import static grpc.testing.Qpstest.Latencies;
+import static grpc.testing.Qpstest.StartArgs;
+import static grpc.testing.Qpstest.Payload;
+import static grpc.testing.Qpstest.PayloadType;
+import static grpc.testing.Qpstest.SimpleResponse;
+import static grpc.testing.Qpstest.SimpleRequest;
+import static grpc.testing.Qpstest.StreamingInputCallResponse;
+import static grpc.testing.Qpstest.StreamingOutputCallResponse;
+import static grpc.testing.Qpstest.StreamingInputCallRequest;
+import static grpc.testing.Qpstest.StreamingOutputCallRequest;
+import static java.lang.Math.max;
+import static io.grpc.testing.integration.Util.loadCert;
+import static io.grpc.testing.integration.Util.pickUnusedPort;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.protobuf.ByteString;
+
+import grpc.testing.Qpstest;
+import grpc.testing.TestServiceGrpc;
+import io.grpc.ServerImpl;
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
+import io.grpc.transport.netty.NettyServerBuilder;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.handler.ssl.SslContext;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+public class QpsServer {
+
+ private boolean enable_tls;
+ private int port = 0;
+ private int serverThreads = 4;
+
+ public void run(String[] args) throws Exception {
+ if (!parseArgs(args)) {
+ return;
+ }
+
+ SslContext sslContext = null;
+ if (enable_tls) {
+ System.out.println("Using fake CA for TLS certificate.\n"
+ + "Run the Java client with --enable_tls --use_testca");
+
+ File cert = loadCert("server1.pem");
+ File key = loadCert("server1.key");
+ sslContext = SslContext.newServerContext(cert, key);
+ }
+
+ if (port == 0) {
+ port = pickUnusedPort();
+ }
+
+ final EventLoopGroup boss = new NioEventLoopGroup(1);
+ final EventLoopGroup worker = new NioEventLoopGroup(serverThreads);
+
+ final ServerImpl server = NettyServerBuilder
+ .forPort(port)
+ .userBossEventLoopGroup(boss)
+ .workerEventLoopGroup(worker)
+ .addService(TestServiceGrpc.bindService(new TestServiceImpl()))
+ .sslContext(sslContext)
+ // TODO(buchgr): Figure out what "server_threads" means in java
+ .executor(MoreExecutors.newDirectExecutorService())
+ .build();
+ server.start();
+
+ System.out.println("QPS Server started on port " + port);
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ try {
+ System.out.println("QPS Server shutting down");
+ server.shutdown();
+ server.awaitTerminated(5, TimeUnit.SECONDS);
+ boss.shutdownGracefully(0, 5, TimeUnit.SECONDS);
+ worker.shutdownGracefully(0, 5, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ // This is a hack to remain there until we have figured out how
+ // we deal with daemon threads, because Netty's EventLoops by default
+ // use daemon threads the application would just exit immediately.
+ server.awaitTerminated(100, TimeUnit.DAYS);
+ }
+
+ private boolean parseArgs(String[] args) {
+ try {
+ for (String arg : args) {
+ if (!arg.startsWith("--")) {
+ System.err.println("All arguments must start with '--': " + arg);
+ printUsage();
+ return false;
+ }
+
+ String[] pair = arg.substring(2).split("=", 2);
+ String key = pair[0];
+ String value = "";
+ if (pair.length == 2) {
+ value = pair[1];
+ }
+
+ if ("server_threads".equals(key)) {
+ serverThreads = max(Integer.parseInt(value), 1);
+ } else if ("port".equals(key)) {
+ port = Integer.parseInt(value);
+ } else if ("enable_tls".equals(key)) {
+ enable_tls = true;
+ } else {
+ System.err.println("Unrecognized argument '" + key + "'.");
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ printUsage();
+ return false;
+ }
+
+ return true;
+ }
+
+ private void printUsage() {
+ QpsServer s = new QpsServer();
+ System.out.println(
+ "Usage: [ARGS...]"
+ + "\n"
+ + "\n --port Port of the server. By default a random port is chosen."
+ + "\n --server_threads Number of client threads. Default " + s.serverThreads
+ + "\n --enable_tls Enable TLS. Default disabled."
+ );
+ }
+
+ private static class TestServiceImpl implements TestServiceGrpc.TestService {
+
+ @Override
+ public void startTest(StartArgs request, StreamObserver<Latencies> responseObserver) {
+ throw Status.UNIMPLEMENTED.asRuntimeException();
+ }
+
+ @Override
+ public void collectServerStats(StatsRequest request,
+ StreamObserver<ServerStats> responseObserver) {
+ double nowSeconds = System.currentTimeMillis() / 1000.0;
+
+ ServerStats stats = ServerStats.newBuilder()
+ .setTimeNow(nowSeconds)
+ .setTimeUser(0)
+ .setTimeSystem(0)
+ .build();
+ responseObserver.onValue(stats);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void unaryCall(SimpleRequest request,
+ StreamObserver<Qpstest.SimpleResponse> responseObserver) {
+ if (!request.hasResponseSize()) {
+ throw Status.INTERNAL.augmentDescription("responseSize required").asRuntimeException();
+ } else if (!request.hasResponseType()) {
+ throw Status.INTERNAL.augmentDescription("responseType required").asRuntimeException();
+ } else if (request.getResponseSize() > 0) {
+ // I just added this condition to mimic the C++ QPS Server behaviour.
+ if (!PayloadType.COMPRESSABLE.equals(request.getResponseType())) {
+ throw Status.INTERNAL.augmentDescription("Error creating payload.").asRuntimeException();
+ }
+
+ ByteString body = ByteString.copyFrom(new byte[request.getResponseSize()]);
+ PayloadType type = request.getResponseType();
+
+ Payload payload = Payload.newBuilder().setType(type).setBody(body).build();
+ SimpleResponse response = SimpleResponse.newBuilder().setPayload(payload).build();
+
+ responseObserver.onValue(response);
+ responseObserver.onCompleted();
+ } else {
+ responseObserver.onValue(SimpleResponse.getDefaultInstance());
+ responseObserver.onCompleted();
+ }
+ }
+
+ @Override
+ public void streamingOutputCall(StreamingOutputCallRequest request,
+ StreamObserver<StreamingOutputCallResponse> responseObserver) {
+ throw Status.UNIMPLEMENTED.asRuntimeException();
+ }
+
+ @Override
+ public StreamObserver<StreamingInputCallRequest>
+ streamingInputCall(StreamObserver<StreamingInputCallResponse> responseObserver) {
+ throw Status.UNIMPLEMENTED.asRuntimeException();
+ }
+
+ @Override
+ public StreamObserver<StreamingOutputCallRequest>
+ fullDuplexCall(StreamObserver<StreamingOutputCallResponse> responseObserver) {
+ throw Status.UNIMPLEMENTED.asRuntimeException();
+ }
+
+ @Override
+ public StreamObserver<StreamingOutputCallRequest>
+ halfDuplexCall(StreamObserver<StreamingOutputCallResponse> responseObserver) {
+ throw Status.UNIMPLEMENTED.asRuntimeException();
+ }
+ }
+}