Add NPN/ALPN support via Jetty extensions
Resurrect integration tests that pass through GFE for HTTP2+Netty
Misc improvements to hexa environment

-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=70683236
diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Client.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Client.java
index 694c07d..4324450 100644
--- a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Client.java
+++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Client.java
@@ -1,6 +1,10 @@
 package com.google.net.stubby.http2.netty;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.logging.FormattingLogger;
+import com.google.common.util.concurrent.SettableFuture;
 import com.google.net.stubby.RequestRegistry;
 import com.google.net.stubby.Session;
 
@@ -8,16 +12,33 @@
 import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerAdapter;
+import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.DefaultHttpRequest;
+import io.netty.handler.codec.http.HttpClientCodec;
+import io.netty.handler.codec.http.HttpClientUpgradeHandler;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http2.Http2ClientUpgradeCodec;
 import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.Promise;
 
-import javax.annotation.Nullable;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
 import javax.net.ssl.SSLEngine;
 
 /**
@@ -25,59 +46,165 @@
  * with protocol bindings.
  */
 public class Http2Client {
+  /** HTTP2 draft 12 protocol name. */
+  public static final String HTTP_VERSION_NAME = "h2-12";
+
+  private static final String[] JETTY_TLS_NEGOTIATION_IMPL = {
+      "org.eclipse.jetty.alpn.ALPN", // Prefer ALPN to NPN so try it first
+      "org.eclipse.jetty.npn.NextProtoNego"};
+
+  private static final FormattingLogger log = FormattingLogger.getLoggerForCallerClass();
+
   private final String host;
   private final int port;
   private final RequestRegistry requestRegistry;
   private final SSLEngine sslEngine;
+  private final boolean usePlaintextUpgrade;
   private Channel channel;
 
-  public Http2Client(String host, int port, RequestRegistry requestRegistry) {
-    this(host, port, requestRegistry, null);
+  public Http2Client(String host, int port, RequestRegistry requestRegistry,
+                     boolean usePlaintextUpgrade) {
+    this.host = Preconditions.checkNotNull(host);
+    this.port = port;
+    this.requestRegistry = Preconditions.checkNotNull(requestRegistry);
+    this.usePlaintextUpgrade = usePlaintextUpgrade;
+    this.sslEngine = null;
   }
 
-  public Http2Client(String host, int port, RequestRegistry requestRegistry,
-                    @Nullable SSLEngine sslEngine) {
-    this.host = host;
+  public Http2Client(String host, int port, RequestRegistry requestRegistry, SSLEngine sslEngine) {
+    this.host = Preconditions.checkNotNull(host);
     this.port = port;
-    this.requestRegistry = requestRegistry;
-    this.sslEngine = sslEngine;
-    // TODO(user): NPN support
-    if (sslEngine != null) {
-      sslEngine.setUseClientMode(true);
-    }
+    this.requestRegistry = Preconditions.checkNotNull(requestRegistry);
+    this.sslEngine = Preconditions.checkNotNull(sslEngine);
+    this.sslEngine.setUseClientMode(true);
+    this.usePlaintextUpgrade = false;
   }
 
   public Session startAndWait() {
-    EventLoopGroup workerGroup = new NioEventLoopGroup();
+    final Http2Codec http2Codec = new Http2Codec(requestRegistry);
+    if (sslEngine != null) {
+      startTLS(http2Codec);
+    } else {
+      if (usePlaintextUpgrade) {
+        startPlaintextUpgrade(http2Codec);
+      } else {
+        startPlaintext(http2Codec);
+      }
+    }
+    return new Http2Session(http2Codec.getWriter(), requestRegistry);
+  }
+
+  private void startTLS(final Http2Codec http2Codec) {
+    SettableFuture<Void> tlsNegotiatedHttp2 = SettableFuture.create();
+    if (!installJettyTLSProtocolSelection(sslEngine, tlsNegotiatedHttp2)) {
+      throw new IllegalStateException("NPN/ALPN extensions not installed");
+    }
+    final CountDownLatch sslCompletion = new CountDownLatch(1);
+    Channel channel = connect(new ChannelInitializer<SocketChannel>() {
+      @Override
+      public void initChannel(SocketChannel ch) throws Exception {
+        SslHandler sslHandler = new SslHandler(sslEngine, false);
+        sslHandler.handshakeFuture().addListener(
+            new GenericFutureListener<Future<? super Channel>>() {
+              @Override
+              public void operationComplete(Future<? super Channel> future) throws Exception {
+                sslCompletion.countDown();
+              }
+            });
+        ch.pipeline().addLast(sslHandler);
+        ch.pipeline().addLast(http2Codec);
+      }
+    });
     try {
-      Bootstrap b = new Bootstrap(); // (1)
-      b.group(workerGroup); // (2)
-      b.channel(NioSocketChannel.class); // (3)
-      b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
-      // TODO(user): Evaluate use of pooled allocator
-      b.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
-      final Http2Codec http2Codec = new Http2Codec(requestRegistry);
-      b.handler(new ChannelInitializer<SocketChannel>() {
-        @Override
-        public void initChannel(SocketChannel ch) throws Exception {
-          if (sslEngine != null) {
-            // Assume TLS when using SSL
-            ch.pipeline().addLast(new SslHandler(sslEngine, false));
-          }
-          ch.pipeline().addLast(http2Codec);
-        }
-      });
-      // Start the client.
-      ChannelFuture channelFuture = b.connect(host, port);
+      // Wait for SSL negotiation to complete
+      if (!sslCompletion.await(20, TimeUnit.SECONDS)) {
+        throw new IllegalStateException("Failed to negotiate TLS");
+      }
+      // Wait for NPN/ALPN negotation to complete. Will throw if failed.
+      tlsNegotiatedHttp2.get(5, TimeUnit.SECONDS);
+    } catch (Exception e) {
+      // Attempt to close the channel before propagating the error
+      channel.close();
+      throw new IllegalStateException("Error waiting for TLS negotiation", e);
+    }
+  }
+
+  /**
+   * Start the connection and use the plaintext upgrade mechanism from HTTP/1.1 to HTTP2.
+   */
+  private void startPlaintextUpgrade(final Http2Codec http2Codec) {
+    // Register the plaintext upgrader
+    Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(http2Codec);
+    HttpClientCodec httpClientCodec = new HttpClientCodec();
+    final HttpClientUpgradeHandler upgrader = new HttpClientUpgradeHandler(httpClientCodec,
+        upgradeCodec, 1000);
+    final UpgradeCompletionHandler completionHandler = new UpgradeCompletionHandler();
+
+    Channel channel = connect(new ChannelInitializer<SocketChannel>() {
+      @Override
+      public void initChannel(SocketChannel ch) throws Exception {
+        ch.pipeline().addLast(upgrader);
+        ch.pipeline().addLast(completionHandler);
+      }
+    });
+
+    try {
+      // Trigger the HTTP/1.1 plaintext upgrade protocol by issuing an HTTP request
+      // which causes the upgrade headers to be added
+      Promise<Void> upgradePromise = completionHandler.getUpgradePromise();
+      DefaultHttpRequest upgradeTrigger =
+          new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
+      channel.writeAndFlush(upgradeTrigger);
+      // Wait for the upgrade to complete
+      upgradePromise.get(5, TimeUnit.SECONDS);
+    } catch (Exception e) {
+      // Attempt to close the channel before propagating the error
+      channel.close();
+      throw new IllegalStateException("Error waiting for plaintext protocol upgrade", e);
+    }
+  }
+
+  /**
+   * Start the connection and simply assume the protocol to already be negotiated.
+   */
+  private void startPlaintext(final Http2Codec http2Codec) {
+    connect(new ChannelInitializer<SocketChannel>() {
+      @Override
+      public void initChannel(SocketChannel ch) throws Exception {
+        ch.pipeline().addLast(http2Codec);
+      }
+    });
+  }
+
+  /**
+   * Configure the bootstrap options for the connection.
+   */
+  private Channel connect(ChannelInitializer<SocketChannel> handler) {
+    // Configure worker pools and buffer allocator
+    EventLoopGroup workerGroup = new NioEventLoopGroup();
+    Bootstrap b = new Bootstrap();
+    b.group(workerGroup);
+    b.channel(NioSocketChannel.class);
+    b.option(ChannelOption.SO_KEEPALIVE, true);
+    // TODO(user): Evaluate use of pooled allocator
+    b.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
+
+    // Install the handler
+    b.handler(handler);
+
+    // Connect and wait for connection to be available
+    ChannelFuture channelFuture = b.connect(host, port);
+    try {
       // Wait for the connection
-      channelFuture.sync(); // (5)
+      channelFuture.get(5, TimeUnit.SECONDS);
       channel = channelFuture.channel();
       ChannelFuture closeFuture = channel.closeFuture();
-      closeFuture.addListener(new WorkerCleanupListener(workerGroup));
-      return new Http2Session(http2Codec.getWriter(), requestRegistry);
+      closeFuture.addListener(new WorkerCleanupListener(b.group()));
+      return channel;
+    } catch (TimeoutException te) {
+      throw new IllegalStateException("Timeout waiting for connection to " + host + ":" + port, te);
     } catch (Throwable t) {
-      workerGroup.shutdownGracefully();
-      throw Throwables.propagate(t);
+      throw new IllegalStateException("Error connecting to " + host + ":" + port, t);
     }
   }
 
@@ -106,5 +233,130 @@
     }
   }
 
+  /**
+   * Report protocol upgrade completion using a promise.
+   */
+  private class UpgradeCompletionHandler extends ChannelHandlerAdapter {
 
+    private Promise<Void> upgradePromise;
+
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+      upgradePromise = ctx.newPromise();
+    }
+
+    public Promise<Void> getUpgradePromise() {
+      return upgradePromise;
+    }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+      if (!upgradePromise.isDone()) {
+        if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_REJECTED) {
+          upgradePromise.setFailure(new Throwable());
+        } else if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_SUCCESSFUL) {
+          upgradePromise.setSuccess(null);
+          ctx.pipeline().remove(this);
+        }
+      }
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+      super.channelInactive(ctx);
+      if (!upgradePromise.isDone()) {
+        upgradePromise.setFailure(new Throwable());
+      }
+    }
+
+    @Override
+    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+      super.channelUnregistered(ctx);
+      if (!upgradePromise.isDone()) {
+        upgradePromise.setFailure(new Throwable());
+      }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+      super.exceptionCaught(ctx, cause);
+      if (!upgradePromise.isDone()) {
+        upgradePromise.setFailure(cause);
+      }
+    }
+  }
+
+  /**
+   * Find Jetty's TLS NPN/ALPN extensions and attempt to use them
+   *
+   * @return true if NPN/ALPN support is available.
+   */
+  private static boolean installJettyTLSProtocolSelection(final SSLEngine engine,
+          final SettableFuture<Void> protocolNegotiated) {
+    for (String protocolNegoClassName : JETTY_TLS_NEGOTIATION_IMPL) {
+      try {
+        Class<?> negoClass;
+        try {
+          negoClass = Class.forName(protocolNegoClassName);
+        } catch (ClassNotFoundException ignored) {
+          // Not on the classpath.
+          log.warningfmt("Jetty extension %s not found", protocolNegoClassName);
+          continue;
+        }
+        Class<?> providerClass = Class.forName(protocolNegoClassName + "$Provider");
+        Class<?> clientProviderClass = Class.forName(protocolNegoClassName + "$ClientProvider");
+        Method putMethod = negoClass.getMethod("put", SSLEngine.class, providerClass);
+        final Method removeMethod = negoClass.getMethod("remove", SSLEngine.class);
+        putMethod.invoke(null, engine, Proxy.newProxyInstance(
+            Http2Client.class.getClassLoader(),
+            new Class[]{clientProviderClass},
+            new InvocationHandler() {
+              @Override
+              public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+                String methodName = method.getName();
+                switch (methodName) {
+                  case "supports":
+                    // both
+                    return true;
+                  case "unsupported":
+                    // both
+                    removeMethod.invoke(null, engine);
+                    protocolNegotiated.setException(
+                        new IllegalStateException("ALPN/NPN not supported by server"));
+                    return null;
+                  case "protocols":
+                    // ALPN only
+                    return ImmutableList.of(HTTP_VERSION_NAME);
+                  case "selected":
+                    // ALPN only
+                    // Only 'supports' one protocol so we know what was 'selected.
+                    removeMethod.invoke(null, engine);
+                    protocolNegotiated.set(null);
+                    return null;
+                  case "selectProtocol":
+                    // NPN only
+                    @SuppressWarnings("unchecked")
+                    List<String> names = (List<String>) args[0];
+                    for (String name : names) {
+                      if (name.startsWith(HTTP_VERSION_NAME)) {
+                        protocolNegotiated.set(null);
+                        return name;
+                      }
+                    }
+                    protocolNegotiated.setException(
+                        new IllegalStateException("Protocol not available via ALPN/NPN: " + names));
+                    removeMethod.invoke(null, engine);
+                    return null;
+                }
+                throw new IllegalStateException("Unknown method " + methodName);
+              }
+            }));
+        return true;
+      } catch (Exception e) {
+        log.severefmt(e, "Unable to initialize protocol negotation for %s",
+            protocolNegoClassName);
+      }
+    }
+    return false;
+  }
 }
diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java
index 19eed75..479a66c 100644
--- a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java
+++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java
@@ -14,7 +14,14 @@
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
 import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler;
+import io.netty.handler.codec.http2.DefaultHttp2Connection;
+import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
+import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
+import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
+import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
+import io.netty.handler.codec.http2.Http2Connection;
 import io.netty.handler.codec.http2.Http2Error;
 import io.netty.handler.codec.http2.Http2Exception;
 import io.netty.handler.codec.http2.Http2Headers;
@@ -36,20 +43,28 @@
    * Constructor used by servers, takes a session which will receive operation events.
    */
   public Http2Codec(Session session, RequestRegistry requestRegistry) {
-    super(true, true);
-    // TODO(user): Use connection.isServer when not private in base class
-    this.client = false;
-    this.session = session;
-    this.requestRegistry = requestRegistry;
+    this(new DefaultHttp2Connection(true, false), false, session, requestRegistry);
   }
 
   /**
    * Constructor used by clients to send operations to a remote server
    */
   public Http2Codec(RequestRegistry requestRegistry) {
-    super(false, true);
-    this.client = true;
-    this.session = null;
+    this(new DefaultHttp2Connection(false, false), true, null, requestRegistry);
+  }
+
+  /**
+   * Constructor used by servers, takes a session which will receive operation events.
+   */
+  private Http2Codec(Http2Connection connection, boolean client,
+                     Session session,
+                     RequestRegistry requestRegistry) {
+    super(connection, new DefaultHttp2FrameReader(),
+        new SuppressCompressionSettingsWriter(), new DefaultHttp2InboundFlowController(connection),
+        new DefaultHttp2OutboundFlowController(connection));
+    // TODO(user): Use connection.isServer when not private in base class
+    this.client = client;
+    this.session = session;
     this.requestRegistry = requestRegistry;
   }
 
@@ -251,6 +266,28 @@
     }
   }
 
+  // TODO(user): Remove this once fixes are done in netty too
+  private static class SuppressCompressionSettingsWriter extends DefaultHttp2FrameWriter {
+    @Override
+    public ChannelFuture writeSettings(ChannelHandlerContext ctx, ChannelPromise promise,
+                                       Http2Settings settings) {
+      Http2Settings newSettings = new Http2Settings();
+      if (settings.hasInitialWindowSize()) {
+        newSettings.initialWindowSize(settings.initialWindowSize());
+      }
+      if (settings.hasMaxConcurrentStreams()) {
+        newSettings.maxConcurrentStreams(settings.maxConcurrentStreams());
+      }
+      if (settings.hasMaxHeaderTableSize()) {
+        newSettings.maxHeaderTableSize(settings.maxHeaderTableSize());
+      }
+      if (settings.hasPushEnabled()) {
+        newSettings.pushEnabled(settings.pushEnabled());
+      }
+      return super.writeSettings(ctx, promise, newSettings);
+    }
+  }
+
   public class Http2Writer {
     private final ChannelHandlerContext ctx;