Allowing Netty TLS bootstrap handler to be sharable.

Fixes #504
diff --git a/benchmarks/src/main/java/io/grpc/benchmarks/qps/ClientConfiguration.java b/benchmarks/src/main/java/io/grpc/benchmarks/qps/ClientConfiguration.java
index 9a131c3..b8b7097 100644
--- a/benchmarks/src/main/java/io/grpc/benchmarks/qps/ClientConfiguration.java
+++ b/benchmarks/src/main/java/io/grpc/benchmarks/qps/ClientConfiguration.java
@@ -41,12 +41,11 @@
 
 import io.grpc.testing.PayloadType;
 import io.grpc.testing.RpcType;
+import io.grpc.testing.TestUtils;
 import io.grpc.transport.netty.NettyChannelBuilder;
 
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashSet;
@@ -56,7 +55,6 @@
  * Configuration options for benchmark clients.
  */
 class ClientConfiguration implements Configuration {
-  private static final String TESTCA_HOST = "foo.test.google.fr";
   private static final ClientConfiguration DEFAULT = new ClientConfiguration();
 
   Transport transport = Transport.NETTY_NIO;
@@ -117,14 +115,9 @@
 
         if (config.testca && config.address instanceof InetSocketAddress) {
           // Override the socket address with the host from the testca.
-          try {
-            InetSocketAddress prevAddress = (InetSocketAddress) config.address;
-            InetAddress inetAddress = InetAddress.getByName(prevAddress.getHostName());
-            inetAddress = InetAddress.getByAddress(TESTCA_HOST, inetAddress.getAddress());
-            config.address = new InetSocketAddress(inetAddress, prevAddress.getPort());
-          } catch (UnknownHostException e) {
-            throw new RuntimeException(e);
-          }
+          InetSocketAddress address = (InetSocketAddress) config.address;
+          config.address = TestUtils.testServerAddress(address.getHostName(),
+                  address.getPort());
         }
       }
 
diff --git a/build.gradle b/build.gradle
index d83b400..b2387ee 100644
--- a/build.gradle
+++ b/build.gradle
@@ -252,6 +252,9 @@
     test {
         testLogging {
             exceptionFormat = 'full'
+            showExceptions true
+            showCauses true
+            showStackTraces true
         }
     }
 }
diff --git a/interop-testing/build.gradle b/interop-testing/build.gradle
index e4e3652..eb6f399 100644
--- a/interop-testing/build.gradle
+++ b/interop-testing/build.gradle
@@ -28,12 +28,6 @@
 
 test {
     jvmArgs "-Xbootclasspath/p:" + configurations.alpnboot.asPath
-    testLogging {
-        exceptionFormat "full"
-        showExceptions true
-        showCauses true
-        showStackTraces true
-    }
 }
 
 task test_client(type: CreateStartScripts) {
diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java
index 05643b9..504c815 100644
--- a/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java
+++ b/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java
@@ -43,8 +43,6 @@
 import org.junit.runners.JUnit4;
 
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
 
 /**
  * Integration tests for GRPC over HTTP2 using the Netty framework.
@@ -73,10 +71,8 @@
   @Override
   protected ChannelImpl createChannel() {
     try {
-      InetAddress address
-          = InetAddress.getByAddress("foo.test.google.fr", new byte[] {127, 0, 0, 1});
       return NettyChannelBuilder
-          .forAddress(new InetSocketAddress(address, serverPort))
+          .forAddress(TestUtils.testServerAddress(serverPort))
           .sslContext(GrpcSslContexts.forClient().trustManager(
                   TestUtils.loadCert("ca.pem")).build())
           .build();
diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java
index b0885fc..b2b8b71 100644
--- a/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java
+++ b/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java
@@ -71,7 +71,7 @@
   @Override
   protected ChannelImpl createChannel() {
     OkHttpChannelBuilder builder = OkHttpChannelBuilder.forAddress("127.0.0.1", serverPort)
-        .overrideHostForAuthority("foo.test.google.fr");
+        .overrideHostForAuthority(TestUtils.TEST_SERVER_HOST);
     try {
       builder.sslSocketFactory(TestUtils.getSslSocketFactoryForCertainCert(
               TestUtils.loadCert("ca.pem")));
diff --git a/netty/build.gradle b/netty/build.gradle
index ddf5a63..548edf0 100644
--- a/netty/build.gradle
+++ b/netty/build.gradle
@@ -5,7 +5,12 @@
             libraries.netty
 
     // Tests depend on base class defined by core module.
-    testCompile project(':grpc-core').sourceSets.test.output
+    testCompile project(':grpc-core').sourceSets.test.output,
+                project(':grpc-testing')
+}
+
+test {
+    jvmArgs "-Xbootclasspath/p:" + configurations.alpnboot.asPath
 }
 
 javadoc.options.links 'https://netty.io/4.1/api/'
diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java
index f485284..2ba4f11 100644
--- a/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java
+++ b/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java
@@ -37,9 +37,11 @@
 
 import io.grpc.Metadata;
 import io.grpc.MethodDescriptor;
+import io.grpc.Status;
 import io.grpc.transport.ClientStream;
 import io.grpc.transport.ClientStreamListener;
 import io.grpc.transport.ClientTransport;
+
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
@@ -123,16 +125,27 @@
     Preconditions.checkNotNull(listener, "listener");
 
     // Create the stream.
-    NettyClientStream stream = new NettyClientStream(listener, channel, handler);
+    final NettyClientStream stream = new NettyClientStream(listener, channel, handler);
 
     // Convert the headers into Netty HTTP/2 headers.
     AsciiString defaultPath = new AsciiString("/" + method.getName());
     Http2Headers http2Headers = Utils.convertClientHeaders(headers, negotiationHandler.scheme(),
         defaultPath, authority);
 
+    ChannelFutureListener failureListener = new ChannelFutureListener() {
+      @Override
+      public void operationComplete(ChannelFuture future) throws Exception {
+        if (!future.isSuccess()) {
+          // Stream creation failed. Close the stream if not already closed.
+          stream.transportReportStatus(Status.fromThrowable(future.cause()), true,
+                  new Metadata.Trailers());
+        }
+      }
+    };
+
     // Write the command requesting the creation of the stream.
     handler.getWriteQueue().enqueue(new CreateStreamCommand(http2Headers, stream),
-        !method.getType().clientSendsOneMessage());
+            !method.getType().clientSendsOneMessage()).addListener(failureListener);
     return stream;
   }
 
@@ -159,19 +172,7 @@
     channel.closeFuture().addListener(new ChannelFutureListener() {
       @Override
       public void operationComplete(ChannelFuture future) throws Exception {
-        if (!future.isSuccess()) {
-          // The close failed. Just notify that transport shutdown failed.
-          notifyTerminated(future.cause());
-          return;
-        }
-
-        if (handler.connectionError() != null) {
-          // The handler encountered a connection error.
-          notifyTerminated(handler.connectionError());
-        } else {
-          // Normal termination of the connection.
-          notifyTerminated(null);
-        }
+        notifyTerminated(handler.connectionError());
       }
     });
   }
diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/transport/netty/NettyServerTransport.java
index 2ee9ad7..d08b697 100644
--- a/netty/src/main/java/io/grpc/transport/netty/NettyServerTransport.java
+++ b/netty/src/main/java/io/grpc/transport/netty/NettyServerTransport.java
@@ -90,15 +90,7 @@
     channel.closeFuture().addListener(new ChannelFutureListener() {
       @Override
       public void operationComplete(ChannelFuture future) throws Exception {
-        if (!future.isSuccess()) {
-          notifyTerminated(future.cause());
-        } else if (handler.connectionError() != null) {
-          // The handler encountered a connection error.
-          notifyTerminated(handler.connectionError());
-        } else {
-          // Normal termination of the connection.
-          notifyTerminated(null);
-        }
+        notifyTerminated(handler.connectionError());
       }
     });
 
diff --git a/netty/src/main/java/io/grpc/transport/netty/ProtocolNegotiators.java b/netty/src/main/java/io/grpc/transport/netty/ProtocolNegotiators.java
index 862d290..055d615 100644
--- a/netty/src/main/java/io/grpc/transport/netty/ProtocolNegotiators.java
+++ b/netty/src/main/java/io/grpc/transport/netty/ProtocolNegotiators.java
@@ -33,6 +33,8 @@
 
 import com.google.common.base.Preconditions;
 
+import io.grpc.Status;
+
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelHandler;
@@ -56,6 +58,8 @@
 import java.net.InetSocketAddress;
 import java.util.ArrayDeque;
 import java.util.Queue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLParameters;
@@ -64,6 +68,7 @@
  * Common {@link ProtocolNegotiator}s used by gRPC.
  */
 public final class ProtocolNegotiators {
+  private static final Logger log = Logger.getLogger(ProtocolNegotiators.class.getName());
 
   private ProtocolNegotiators() {
   }
@@ -86,32 +91,31 @@
     Preconditions.checkNotNull(sslContext, "sslContext");
     Preconditions.checkNotNull(inetAddress, "inetAddress");
 
-    final ChannelHandler sslBootstrapHandler = new ChannelHandlerAdapter() {
+    return new ProtocolNegotiator() {
       @Override
-      public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
-        // TODO(nmittler): This method is currently unsupported for OpenSSL. Need to fix in Netty.
-        SSLEngine sslEngine = sslContext.newEngine(ctx.alloc(),
-            inetAddress.getHostName(), inetAddress.getPort());
-        SSLParameters sslParams = new SSLParameters();
-        sslParams.setEndpointIdentificationAlgorithm("HTTPS");
-        sslEngine.setSSLParameters(sslParams);
+      public Handler newHandler(Http2ConnectionHandler handler) {
+        ChannelHandler sslBootstrap = new ChannelHandlerAdapter() {
+          @Override
+          public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+            // TODO(nmittler): Unsupported for OpenSSL in Netty < 4.1.Beta6.
+            SSLEngine sslEngine = sslContext.newEngine(ctx.alloc(),
+                    inetAddress.getHostName(), inetAddress.getPort());
+            SSLParameters sslParams = new SSLParameters();
+            sslParams.setEndpointIdentificationAlgorithm("HTTPS");
+            sslEngine.setSSLParameters(sslParams);
 
-        SslHandler sslHandler = new SslHandler(sslEngine, false);
-        sslHandler.handshakeFuture().addListener(
-            new GenericFutureListener<Future<? super Channel>>() {
+            SslHandler sslHandler = new SslHandler(sslEngine, false);
+            sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
               @Override
-              public void operationComplete(Future<? super Channel> future) throws Exception {
+              public void operationComplete(Future<Channel> future) throws Exception {
                 // If an error occurred during the handshake, throw it to the pipeline.
                 future.get();
               }
             });
-        ctx.pipeline().replace(this, "sslHandler", sslHandler);
-      }
-    };
-    return new ProtocolNegotiator() {
-      @Override
-      public Handler newHandler(Http2ConnectionHandler handler) {
-        return new BufferUntilTlsNegotiatedHandler(sslBootstrapHandler, handler);
+            ctx.pipeline().replace(this, null, sslHandler);
+          }
+        };
+        return new BufferUntilTlsNegotiatedHandler(sslBootstrap, handler);
       }
     };
   }
@@ -146,9 +150,13 @@
     };
   }
 
+  private static RuntimeException unavailableException(String msg) {
+    return Status.UNAVAILABLE.withDescription(msg).asRuntimeException();
+  }
+
   /**
    * Buffers all writes until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} or
-   * {@link #failBufferedAndClose(ChannelHandlerContext)} is called. This handler allows us to
+   * {@link #fail(ChannelHandlerContext, Throwable)} is called. This handler allows us to
    * write to a {@link Channel} before we are allowed to write to it officially i.e.
    * before it's active or the TLS Handshake is complete.
    */
@@ -181,8 +189,13 @@
     }
 
     @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+      fail(ctx, cause);
+    }
+
+    @Override
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-      failBufferedAndClose(ctx);
+      fail(ctx, unavailableException("Connection broken while performing protocol negotiation"));
       super.channelInactive(ctx);
     }
 
@@ -224,18 +237,20 @@
 
     @Override
     public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
-      failBufferedAndClose(ctx);
+      fail(ctx, unavailableException("Channel closed while performing protocol negotiation"));
     }
 
-    protected void failBufferedAndClose(ChannelHandlerContext ctx) {
+    protected final void fail(ChannelHandlerContext ctx, Throwable cause) {
       if (bufferedWrites != null) {
-        Exception e = new Exception("Buffered write failed.");
         while (!bufferedWrites.isEmpty()) {
           ChannelWrite write = bufferedWrites.poll();
-          write.promise.setFailure(e);
+          write.promise.setFailure(cause);
         }
         bufferedWrites = null;
       }
+
+      log.log(Level.SEVERE, "Transport failed during protocol negotiation", cause);
+
       /**
        * In case something goes wrong ensure that the channel gets closed as the
        * NettyClientTransport relies on the channel's close future to get completed.
@@ -243,7 +258,7 @@
       ctx.close();
     }
 
-    protected void writeBufferedAndRemove(ChannelHandlerContext ctx) {
+    protected final void writeBufferedAndRemove(ChannelHandlerContext ctx) {
       if (!ctx.channel().isActive() || writing) {
         return;
       }
@@ -298,7 +313,7 @@
         if (handshakeEvent.isSuccess()) {
           writeBufferedAndRemove(ctx);
         } else {
-          failBufferedAndClose(ctx);
+          fail(ctx, handshakeEvent.cause());
         }
       }
       super.userEventTriggered(ctx, evt);
@@ -362,8 +377,7 @@
       if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_SUCCESSFUL) {
         writeBufferedAndRemove(ctx);
       } else if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_REJECTED) {
-        failBufferedAndClose(ctx);
-        ctx.pipeline().fireExceptionCaught(new Exception("HTTP/2 upgrade rejected"));
+        fail(ctx, unavailableException("HTTP/2 upgrade rejected"));
       }
       super.userEventTriggered(ctx, evt);
     }
diff --git a/netty/src/main/java/io/grpc/transport/netty/WriteQueue.java b/netty/src/main/java/io/grpc/transport/netty/WriteQueue.java
index 0e8b2c3..0059222 100644
--- a/netty/src/main/java/io/grpc/transport/netty/WriteQueue.java
+++ b/netty/src/main/java/io/grpc/transport/netty/WriteQueue.java
@@ -34,6 +34,7 @@
 import com.google.common.base.Preconditions;
 
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelPromise;
 
 import java.util.ArrayDeque;
@@ -92,8 +93,8 @@
    * @param flush true if a flush of the write should be schedule, false if a later call to
    *              enqueue will schedule the flush.
    */
-  void enqueue(Object command, boolean flush) {
-    enqueue(command, channel.newPromise(), flush);
+  ChannelFuture enqueue(Object command, boolean flush) {
+    return enqueue(command, channel.newPromise(), flush);
   }
 
   /**
@@ -104,11 +105,12 @@
    * @param flush true if a flush of the write should be schedule, false if a later call to
    *              enqueue will schedule the flush.
    */
-  void enqueue(Object command, ChannelPromise promise, boolean flush) {
+  ChannelFuture enqueue(Object command, ChannelPromise promise, boolean flush) {
     queue.add(new QueuedCommand(command, promise));
     if (flush) {
       scheduleFlush();
     }
+    return promise;
   }
 
   /**
diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java b/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java
index ca8ec81..32fd2ec 100644
--- a/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java
+++ b/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java
@@ -44,7 +44,6 @@
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
@@ -54,6 +53,7 @@
 import io.grpc.Metadata;
 import io.grpc.Status;
 import io.grpc.transport.ClientStreamListener;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelPromise;
@@ -328,7 +328,7 @@
         return null;
       }
     }).when(writeQueue).enqueue(any(), any(ChannelPromise.class), anyBoolean());
-    doNothing().when(writeQueue).enqueue(any(), anyBoolean());
+    when(writeQueue.enqueue(any(), anyBoolean())).thenReturn(future);
     NettyClientStream stream = new NettyClientStream(listener, channel, handler);
     assertTrue(stream.canSend());
     assertTrue(stream.canReceive());
diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/transport/netty/NettyClientTransportTest.java
new file mode 100644
index 0000000..f4e504d
--- /dev/null
+++ b/netty/src/test/java/io/grpc/transport/netty/NettyClientTransportTest.java
@@ -0,0 +1,255 @@
+/*
+ * Copyright 2014, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *    * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *    * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ *    * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc.transport.netty;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.util.concurrent.SettableFuture;
+
+import io.grpc.Marshaller;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.MethodType;
+import io.grpc.Status;
+import io.grpc.testing.TestUtils;
+import io.grpc.transport.ClientStream;
+import io.grpc.transport.ClientStreamListener;
+import io.grpc.transport.ClientTransport;
+import io.grpc.transport.ServerListener;
+import io.grpc.transport.ServerStream;
+import io.grpc.transport.ServerStreamListener;
+import io.grpc.transport.ServerTransport;
+import io.grpc.transport.ServerTransportListener;
+
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.ssl.SslContext;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests for {@link NettyClientTransport}.
+ */
+@RunWith(JUnit4.class)
+public class NettyClientTransportTest {
+  private static final String MESSAGE = "hello";
+
+  @Mock
+  private ClientTransport.Listener clientTransportListener;
+
+  private final List<NettyClientTransport> transports = new ArrayList<NettyClientTransport>();
+  private NioEventLoopGroup group;
+  private InetSocketAddress address;
+  private NettyServer server;
+
+  @Before
+  public void setup() throws Exception {
+    MockitoAnnotations.initMocks(this);
+
+    group = new NioEventLoopGroup(1);
+
+    // Start the server.
+    address = TestUtils.testServerAddress(TestUtils.pickUnusedPort());
+    File serverCert = TestUtils.loadCert("server1.pem");
+    File key = TestUtils.loadCert("server1.key");
+    SslContext serverContext = GrpcSslContexts.forServer(serverCert, key).build();
+    server = new NettyServer(address, NioServerSocketChannel.class,
+            group, group, serverContext, 100, DEFAULT_WINDOW_SIZE, DEFAULT_WINDOW_SIZE);
+    server.start(new TestServerListener());
+  }
+
+  @After
+  public void teardown() throws Exception {
+    for (NettyClientTransport transport : transports) {
+      transport.shutdown();
+    }
+
+    if (server != null) {
+      server.shutdown();
+    }
+
+    group.shutdownGracefully(0, 10, TimeUnit.SECONDS);
+  }
+
+  /**
+   * Verifies that we can create multiple TLS client transports from the same builder.
+   */
+  @Test
+  public void creatingMultipleTlsTransportsShouldSucceed() throws Exception {
+    // Create the protocol negotiator.
+    File clientCert = TestUtils.loadCert("ca.pem");
+    SslContext clientContext = GrpcSslContexts.forClient().trustManager(clientCert).build();
+    ProtocolNegotiator negotiator = ProtocolNegotiators.tls(clientContext, address);
+
+    // Create a couple client transports.
+    for (int index = 0; index < 2; ++index) {
+      NettyClientTransport transport = new NettyClientTransport(address, NioSocketChannel.class,
+              group, negotiator, DEFAULT_WINDOW_SIZE, DEFAULT_WINDOW_SIZE);
+      transports.add(transport);
+      transport.start(clientTransportListener);
+    }
+
+    // Send a single RPC on each transport.
+    final List<SettableFuture> rpcFutures = new ArrayList<SettableFuture>(transports.size());
+    MethodDescriptor<String, String> method = MethodDescriptor.create(MethodType.UNARY,
+            "/testService/test", 10, TimeUnit.SECONDS, StringMarshaller.INSTANCE,
+            StringMarshaller.INSTANCE);
+    for (NettyClientTransport transport : transports) {
+      SettableFuture rpcFuture = SettableFuture.create();
+      rpcFutures.add(rpcFuture);
+      ClientStream stream = transport.newStream(method, new Metadata.Headers(),
+              new TestClientStreamListener(rpcFuture));
+      stream.request(1);
+      stream.writeMessage(messageStream());
+      stream.halfClose();
+    }
+
+    // Wait for the RPCs to complete.
+    for (SettableFuture rpcFuture : rpcFutures) {
+      rpcFuture.get(10, TimeUnit.SECONDS);
+    }
+  }
+
+  private static InputStream messageStream() {
+    return new ByteArrayInputStream(MESSAGE.getBytes());
+  }
+
+  private static class TestClientStreamListener implements ClientStreamListener {
+    private final SettableFuture<?> future;
+
+    TestClientStreamListener(SettableFuture<?> future) {
+      this.future = future;
+    }
+
+    @Override
+    public void headersRead(Metadata.Headers headers) {
+    }
+
+    @Override
+    public void closed(Status status, Metadata.Trailers trailers) {
+      if (status.isOk()) {
+        future.set(null);
+      } else {
+        future.setException(status.asException());
+      }
+    }
+
+    @Override
+    public void messageRead(InputStream message) {
+    }
+
+    @Override
+    public void onReady() {
+    }
+  }
+
+  private static class TestServerListener implements ServerListener {
+
+    @Override
+    public ServerTransportListener transportCreated(final ServerTransport transport) {
+      return new ServerTransportListener() {
+
+        @Override
+        public ServerStreamListener streamCreated(final ServerStream stream, String method,
+                                                  Metadata.Headers headers) {
+          stream.request(1);
+          return new ServerStreamListener() {
+
+            @Override
+            public void messageRead(InputStream message) {
+              // Just echo back the message.
+              stream.writeMessage(messageStream());
+            }
+
+            @Override
+            public void onReady() {
+            }
+
+            @Override
+            public void halfClosed() {
+              // Just close when the client closes.
+              stream.close(Status.OK, new Metadata.Trailers());
+            }
+
+            @Override
+            public void closed(Status status) {
+            }
+          };
+        }
+
+        @Override
+        public void transportTerminated() {
+        }
+      };
+    }
+
+    @Override
+    public void serverShutdown() {
+    }
+  }
+
+  private static class StringMarshaller implements Marshaller<String> {
+    static final StringMarshaller INSTANCE = new StringMarshaller();
+
+    @Override
+    public InputStream stream(String value) {
+      return new ByteArrayInputStream(value.getBytes(UTF_8));
+    }
+
+    @Override
+    public String parse(InputStream stream) {
+      try {
+        return new String(ByteStreams.toByteArray(stream), UTF_8);
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+  }
+}
diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyServerStreamTest.java b/netty/src/test/java/io/grpc/transport/netty/NettyServerStreamTest.java
index 61997ad..b5ec169 100644
--- a/netty/src/test/java/io/grpc/transport/netty/NettyServerStreamTest.java
+++ b/netty/src/test/java/io/grpc/transport/netty/NettyServerStreamTest.java
@@ -40,7 +40,6 @@
 import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
@@ -51,6 +50,7 @@
 import io.grpc.Metadata;
 import io.grpc.Status;
 import io.grpc.transport.ServerStreamListener;
+
 import io.netty.buffer.EmptyByteBuf;
 import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.ChannelPromise;
@@ -246,7 +246,7 @@
         return null;
       }
     }).when(writeQueue).enqueue(any(), any(ChannelPromise.class), anyBoolean());
-    doNothing().when(writeQueue).enqueue(any(), anyBoolean());
+    when(writeQueue.enqueue(any(), anyBoolean())).thenReturn(future);
     NettyServerStream stream = new NettyServerStream(channel, http2Stream, handler);
     stream.setListener(serverListener);
     assertTrue(stream.canReceive());
diff --git a/testing/src/main/java/io/grpc/testing/TestUtils.java b/testing/src/main/java/io/grpc/testing/TestUtils.java
index 6d305ed..9efa5ee 100644
--- a/testing/src/main/java/io/grpc/testing/TestUtils.java
+++ b/testing/src/main/java/io/grpc/testing/TestUtils.java
@@ -45,7 +45,10 @@
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.ServerSocket;
+import java.net.UnknownHostException;
 import java.security.KeyStore;
 import java.security.cert.CertificateFactory;
 import java.security.cert.X509Certificate;
@@ -62,6 +65,7 @@
  * Common utility functions useful for writing tests.
  */
 public class TestUtils {
+  public static final String TEST_SERVER_HOST = "foo.test.google.fr";
 
   /**
    * Echo the request headers from a client into response headers and trailers. Useful for
@@ -120,6 +124,32 @@
   }
 
   /**
+   * Creates a new {@link InetSocketAddress} that overrides the host with {@link #TEST_SERVER_HOST}.
+   */
+  public static InetSocketAddress testServerAddress(String host, int port) {
+    try {
+      InetAddress inetAddress = InetAddress.getByName(host);
+      inetAddress = InetAddress.getByAddress(TEST_SERVER_HOST, inetAddress.getAddress());
+      return new InetSocketAddress(inetAddress, port);
+    } catch (UnknownHostException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Creates a new {@link InetSocketAddress} on localhost that overrides the host with
+   * {@link #TEST_SERVER_HOST}.
+   */
+  public static InetSocketAddress testServerAddress(int port) {
+    try {
+      InetAddress inetAddress = InetAddress.getByName("localhost");
+      inetAddress = InetAddress.getByAddress(TEST_SERVER_HOST, inetAddress.getAddress());
+      return new InetSocketAddress(inetAddress, port);
+    } catch (UnknownHostException e) {
+      throw new RuntimeException(e);
+    }
+  }
+  /**
    * Load a file from the resources folder.
    *
    * @param name  name of a file in src/main/resources/certs.