Add QPS Server and TLS support to QPS Client.
diff --git a/benchmarks/build.gradle b/benchmarks/build.gradle
index 10a843f..9bfc2fc 100644
--- a/benchmarks/build.gradle
+++ b/benchmarks/build.gradle
@@ -2,8 +2,8 @@
 apply plugin: 'protobuf'
 
 description = "gRPC Benchmarks"
+mainClassName = "io.grpc.benchmarks.qps.Main"
 
-mainClassName = "io.grpc.benchmarks.qps.Client"
 
 buildscript {
     repositories {
@@ -14,28 +14,32 @@
     }
 }
 
+configurations {
+    alpnboot
+}
+
 dependencies {
     compile project(':grpc-core'),
             project(':grpc-netty'),
             project(':grpc-okhttp'),
             project(':grpc-stub'),
-            project(':grpc-testing'),
+            project(':grpc-integration-testing'),
             libraries.junit,
             libraries.mockito,
             libraries.hdrhistogram
+
+    // Determine the correct version of Jetty ALPN boot to use based
+    // on the Java version.
+    def alpnboot_prefix = 'org.mortbay.jetty.alpn:alpn-boot:'
+    def alpnboot_version = '8.1.2.v20141202'
+    if (JavaVersion.current().ordinal() < JavaVersion.VERSION_1_8.ordinal()) {
+        alpnboot_version = '7.1.2.v20141202'
+    }
+
+    alpnboot alpnboot_prefix + alpnboot_version
+
+    applicationDefaultJvmArgs = ["-Xbootclasspath/p:" + configurations.alpnboot.asPath]
 }
 
 protobufCodeGenPlugins = ["java_plugin:$rootDir/compiler/build/binaries/java_pluginExecutable/java_plugin"]
 generateProto.dependsOn ':grpc-compiler:java_pluginExecutable'
-
-// Allow execution of test client and server.
-task execute(dependsOn: classes, type:JavaExec) {
-    main = project.hasProperty('mainClass') ? project.mainClass : 'io.grpc.benchmarks.qps.Client'
-    classpath = sourceSets.main.runtimeClasspath
-    workingDir = project.rootDir
-
-    // If appArgs were provided, set the program arguments.
-    if (project.hasProperty("appArgs")) {
-        args = Eval.me(appArgs)
-    }
-}
diff --git a/benchmarks/src/main/java/io/grpc/benchmarks/qps/Main.java b/benchmarks/src/main/java/io/grpc/benchmarks/qps/Main.java
new file mode 100644
index 0000000..120e0ac
--- /dev/null
+++ b/benchmarks/src/main/java/io/grpc/benchmarks/qps/Main.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *    * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *    * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ *    * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc.benchmarks.qps;
+
+import java.util.Arrays;
+
+public class Main {
+  public static void main(String... args) throws Exception {
+    if (args.length == 0) {
+      printUsage();
+    } else {
+      String arg = args[0].substring(2);
+      if ("server".equals(arg)) {
+        QpsServer server = new QpsServer();
+        server.run(Arrays.copyOfRange(args, 1, args.length));
+      } else if ("client".equals(arg)) {
+        QpsClient client = new QpsClient();
+        client.run(Arrays.copyOfRange(args, 1, args.length));
+      } else {
+        printUsage();
+      }
+    }
+  }
+
+  private static void printUsage() {
+    System.out.println("Usage: Pass one of the two as the FIRST argument."
+                       + "\n"
+                       + "\n  --server     Run the QPS server. No default."
+                       + "\n  --client     Run the QPS client. No default."
+    );
+  }
+}
diff --git a/benchmarks/src/main/java/io/grpc/benchmarks/qps/Client.java b/benchmarks/src/main/java/io/grpc/benchmarks/qps/QpsClient.java
similarity index 83%
rename from benchmarks/src/main/java/io/grpc/benchmarks/qps/Client.java
rename to benchmarks/src/main/java/io/grpc/benchmarks/qps/QpsClient.java
index a1a2c1b..f3cced2 100644
--- a/benchmarks/src/main/java/io/grpc/benchmarks/qps/Client.java
+++ b/benchmarks/src/main/java/io/grpc/benchmarks/qps/QpsClient.java
@@ -35,8 +35,10 @@
 import static grpc.testing.Qpstest.SimpleRequest;
 import static grpc.testing.Qpstest.SimpleResponse;
 import static java.lang.Math.max;
+import static io.grpc.testing.integration.Util.loadCert;
 
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.MoreExecutors;
 
 import grpc.testing.Qpstest.PayloadType;
 import grpc.testing.TestServiceGrpc;
@@ -47,9 +49,14 @@
 import io.grpc.transport.netty.NegotiationType;
 import io.grpc.transport.netty.NettyChannelBuilder;
 import io.grpc.transport.okhttp.OkHttpChannelBuilder;
+import io.netty.handler.ssl.SslContext;
 import org.HdrHistogram.Histogram;
 import org.HdrHistogram.HistogramIterationValue;
 
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CancellationException;
@@ -61,15 +68,13 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Logger;
 
-// TODO: Add OkHttp and Netty TLS Support
-
 /**
  * Runs lots of RPCs against a QPS Server to test for throughput and latency.
  * It's a Java clone of the C version at
  * https://github.com/grpc/grpc/blob/master/test/cpp/qps/client.cc
  */
-public class Client {
-  private static final Logger log = Logger.getLogger(Client.class.getName());
+public class QpsClient {
+  private static final Logger log = Logger.getLogger(QpsClient.class.getName());
 
   // Can record values between 1 ns and 1 min (60 BILLION NS)
   private static final long HISTOGRAM_MAX_VALUE = 60000000000L;
@@ -84,13 +89,10 @@
   private String serverHost  = "127.0.0.1";
   private int serverPort;
   private boolean okhttp;
+  private boolean enableTls;
+  private boolean useTestCa;
 
-  public static void main(String... args) throws Exception {
-    Client c = new Client();
-    c.run(args);
-  }
-
-  private void run(String[] args) throws Exception {
+  public void run(String[] args) throws Exception {
     if (!parseArgs(args)) {
       return;
     }
@@ -143,14 +145,33 @@
     }
   }
 
-  private Channel newChannel() {
+  private Channel newChannel() throws IOException {
     if (okhttp) {
-      return OkHttpChannelBuilder.forAddress(serverHost, serverPort).build();
-    } else {
-      return NettyChannelBuilder.forAddress(serverHost, serverPort)
-                                .negotiationType(NegotiationType.PLAINTEXT)
-                                .build();
+      if (enableTls) {
+        throw new IllegalStateException("TLS unsupported with okhttp");
+      }
+
+      return OkHttpChannelBuilder.forAddress(serverHost, serverPort)
+                                 // TODO(buchgr): Figure out what "server_threads" means in java
+                                 .executor(MoreExecutors.newDirectExecutorService())
+                                 .build();
     }
+
+    SslContext context = null;
+    InetAddress address = InetAddress.getByName(serverHost);
+    NegotiationType negotiationType = enableTls ? NegotiationType.TLS : NegotiationType.PLAINTEXT;
+    if (enableTls && useTestCa) {
+        // Force the hostname to match the cert the server uses.
+        address = InetAddress.getByAddress("foo.test.google.fr", address.getAddress());
+        File cert = loadCert("ca.pem");
+        context = SslContext.newClientContext(cert);
+      }
+
+    return NettyChannelBuilder.forAddress(new InetSocketAddress(address, serverPort))
+                              .executor(MoreExecutors.newDirectExecutorService())
+                              .negotiationType(negotiationType)
+                              .sslContext(context)
+                              .build();
   }
 
   private boolean parseArgs(String[] args) {
@@ -165,11 +186,11 @@
         }
 
         String[] pair = arg.substring(2).split("=", 2);
-        if (pair.length < 2) {
-          continue;
-        }
         String key = pair[0];
-        String value = pair[1];
+        String value = "";
+        if (pair.length == 2) {
+          value = pair[1];
+        }
 
         if ("client_channels".equals(key)) {
           clientChannels = max(Integer.parseInt(value), 1);
@@ -184,8 +205,14 @@
         } else if ("server_port".equals(key)) {
           serverPort = Integer.parseInt(value);
           hasServerPort = true;
-        } else if ("transport".equals(key)) {
-          okhttp = "okhttp".equals(value);
+        } else if ("okhttp".equals(key)) {
+          okhttp = true;
+        } else if ("enable_tls".equals(key)) {
+          enableTls = true;
+        } else if ("use_testca".equals(key)) {
+          useTestCa = true;
+        } else {
+          System.err.println("Unrecognized argument '" + key + "'.");
         }
       }
 
@@ -204,7 +231,7 @@
   }
 
   private void printUsage() {
-    Client c = new Client();
+    QpsClient c = new QpsClient();
     System.out.println(
       "Usage: [ARGS...]"
       + "\n"
@@ -214,7 +241,8 @@
       + "\n  --client_threads=INT        Number of client threads. Default " + c.clientThreads
       + "\n  --num_rpcs=INT              Number of RPCs per thread. Default " + c.numRpcs
       + "\n  --payload_size=INT          Payload size in bytes. Default " + c.payloadSize
-      + "\n  --transport=(okhttp|netty)  The transport to use. Default netty"
+      + "\n  --enable_tls                Enable TLS. Default disabled."
+      + "\n  --use_testca                Use the provided test certificate for TLS."
     );
   }
 
diff --git a/benchmarks/src/main/java/io/grpc/benchmarks/qps/QpsServer.java b/benchmarks/src/main/java/io/grpc/benchmarks/qps/QpsServer.java
new file mode 100644
index 0000000..ac18ffa
--- /dev/null
+++ b/benchmarks/src/main/java/io/grpc/benchmarks/qps/QpsServer.java
@@ -0,0 +1,246 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *    * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *    * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ *    * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc.benchmarks.qps;
+
+import static grpc.testing.Qpstest.StatsRequest;
+import static grpc.testing.Qpstest.ServerStats;
+import static grpc.testing.Qpstest.Latencies;
+import static grpc.testing.Qpstest.StartArgs;
+import static grpc.testing.Qpstest.Payload;
+import static grpc.testing.Qpstest.PayloadType;
+import static grpc.testing.Qpstest.SimpleResponse;
+import static grpc.testing.Qpstest.SimpleRequest;
+import static grpc.testing.Qpstest.StreamingInputCallResponse;
+import static grpc.testing.Qpstest.StreamingOutputCallResponse;
+import static grpc.testing.Qpstest.StreamingInputCallRequest;
+import static grpc.testing.Qpstest.StreamingOutputCallRequest;
+import static java.lang.Math.max;
+import static io.grpc.testing.integration.Util.loadCert;
+import static io.grpc.testing.integration.Util.pickUnusedPort;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.protobuf.ByteString;
+
+import grpc.testing.Qpstest;
+import grpc.testing.TestServiceGrpc;
+import io.grpc.ServerImpl;
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
+import io.grpc.transport.netty.NettyServerBuilder;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.handler.ssl.SslContext;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+public class QpsServer {
+
+  private boolean enable_tls;
+  private int port = 0;
+  private int serverThreads = 4;
+
+  public void run(String[] args) throws Exception {
+    if (!parseArgs(args)) {
+      return;
+    }
+
+    SslContext sslContext = null;
+    if (enable_tls) {
+      System.out.println("Using fake CA for TLS certificate.\n"
+                         + "Run the Java client with --enable_tls --use_testca");
+
+      File cert = loadCert("server1.pem");
+      File key = loadCert("server1.key");
+      sslContext = SslContext.newServerContext(cert, key);
+    }
+
+    if (port == 0) {
+      port = pickUnusedPort();
+    }
+
+    final EventLoopGroup boss = new NioEventLoopGroup(1);
+    final EventLoopGroup worker = new NioEventLoopGroup(serverThreads);
+
+    final ServerImpl server = NettyServerBuilder
+            .forPort(port)
+            .userBossEventLoopGroup(boss)
+            .workerEventLoopGroup(worker)
+            .addService(TestServiceGrpc.bindService(new TestServiceImpl()))
+            .sslContext(sslContext)
+            // TODO(buchgr): Figure out what "server_threads" means in java
+            .executor(MoreExecutors.newDirectExecutorService())
+            .build();
+    server.start();
+
+    System.out.println("QPS Server started on port " + port);
+
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        try {
+          System.out.println("QPS Server shutting down");
+          server.shutdown();
+          server.awaitTerminated(5, TimeUnit.SECONDS);
+          boss.shutdownGracefully(0, 5, TimeUnit.SECONDS);
+          worker.shutdownGracefully(0, 5, TimeUnit.SECONDS);
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+    });
+
+    // This is a hack to remain there until we have figured out how
+    // we deal with daemon threads, because Netty's EventLoops by default
+    // use daemon threads the application would just exit immediately.
+    server.awaitTerminated(100, TimeUnit.DAYS);
+  }
+
+  private boolean parseArgs(String[] args) {
+    try {
+      for (String arg : args) {
+        if (!arg.startsWith("--")) {
+          System.err.println("All arguments must start with '--': " + arg);
+          printUsage();
+          return false;
+        }
+
+        String[] pair = arg.substring(2).split("=", 2);
+        String key = pair[0];
+        String value = "";
+        if (pair.length == 2) {
+          value = pair[1];
+        }
+
+        if ("server_threads".equals(key)) {
+          serverThreads = max(Integer.parseInt(value), 1);
+        } else if ("port".equals(key)) {
+          port = Integer.parseInt(value);
+        } else if ("enable_tls".equals(key)) {
+          enable_tls = true;
+        } else {
+          System.err.println("Unrecognized argument '" + key + "'.");
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      printUsage();
+      return false;
+    }
+
+    return true;
+  }
+
+  private void printUsage() {
+    QpsServer s = new QpsServer();
+    System.out.println(
+            "Usage: [ARGS...]"
+            + "\n"
+            + "\n  --port             Port of the server. By default a random port is chosen."
+            + "\n  --server_threads   Number of client threads. Default " + s.serverThreads
+            + "\n  --enable_tls       Enable TLS. Default disabled."
+    );
+  }
+
+  private static class TestServiceImpl implements TestServiceGrpc.TestService {
+
+    @Override
+    public void startTest(StartArgs request, StreamObserver<Latencies> responseObserver) {
+      throw Status.UNIMPLEMENTED.asRuntimeException();
+    }
+
+    @Override
+    public void collectServerStats(StatsRequest request,
+                                   StreamObserver<ServerStats> responseObserver) {
+      double nowSeconds = System.currentTimeMillis() / 1000.0;
+
+      ServerStats stats = ServerStats.newBuilder()
+                                     .setTimeNow(nowSeconds)
+                                     .setTimeUser(0)
+                                     .setTimeSystem(0)
+                                     .build();
+      responseObserver.onValue(stats);
+      responseObserver.onCompleted();
+    }
+
+    @Override
+    public void unaryCall(SimpleRequest request,
+                          StreamObserver<Qpstest.SimpleResponse> responseObserver) {
+      if (!request.hasResponseSize()) {
+        throw Status.INTERNAL.augmentDescription("responseSize required").asRuntimeException();
+      } else if (!request.hasResponseType()) {
+        throw Status.INTERNAL.augmentDescription("responseType required").asRuntimeException();
+      } else if (request.getResponseSize() > 0) {
+        // I just added this condition to mimic the C++ QPS Server behaviour.
+        if (!PayloadType.COMPRESSABLE.equals(request.getResponseType())) {
+          throw Status.INTERNAL.augmentDescription("Error creating payload.").asRuntimeException();
+        }
+
+        ByteString body = ByteString.copyFrom(new byte[request.getResponseSize()]);
+        PayloadType type = request.getResponseType();
+
+        Payload payload = Payload.newBuilder().setType(type).setBody(body).build();
+        SimpleResponse response = SimpleResponse.newBuilder().setPayload(payload).build();
+
+        responseObserver.onValue(response);
+        responseObserver.onCompleted();
+      } else {
+        responseObserver.onValue(SimpleResponse.getDefaultInstance());
+        responseObserver.onCompleted();
+      }
+    }
+
+    @Override
+    public void streamingOutputCall(StreamingOutputCallRequest request,
+                                    StreamObserver<StreamingOutputCallResponse> responseObserver) {
+      throw Status.UNIMPLEMENTED.asRuntimeException();
+    }
+
+    @Override
+    public StreamObserver<StreamingInputCallRequest>
+    streamingInputCall(StreamObserver<StreamingInputCallResponse> responseObserver) {
+      throw Status.UNIMPLEMENTED.asRuntimeException();
+    }
+
+    @Override
+    public StreamObserver<StreamingOutputCallRequest>
+    fullDuplexCall(StreamObserver<StreamingOutputCallResponse> responseObserver) {
+      throw Status.UNIMPLEMENTED.asRuntimeException();
+    }
+
+    @Override
+    public StreamObserver<StreamingOutputCallRequest>
+    halfDuplexCall(StreamObserver<StreamingOutputCallResponse> responseObserver) {
+      throw Status.UNIMPLEMENTED.asRuntimeException();
+    }
+  }
+}