Allowing Netty TLS bootstrap handler to be sharable.
Fixes #504
diff --git a/benchmarks/src/main/java/io/grpc/benchmarks/qps/ClientConfiguration.java b/benchmarks/src/main/java/io/grpc/benchmarks/qps/ClientConfiguration.java
index 9a131c3..b8b7097 100644
--- a/benchmarks/src/main/java/io/grpc/benchmarks/qps/ClientConfiguration.java
+++ b/benchmarks/src/main/java/io/grpc/benchmarks/qps/ClientConfiguration.java
@@ -41,12 +41,11 @@
import io.grpc.testing.PayloadType;
import io.grpc.testing.RpcType;
+import io.grpc.testing.TestUtils;
import io.grpc.transport.netty.NettyChannelBuilder;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
@@ -56,7 +55,6 @@
* Configuration options for benchmark clients.
*/
class ClientConfiguration implements Configuration {
- private static final String TESTCA_HOST = "foo.test.google.fr";
private static final ClientConfiguration DEFAULT = new ClientConfiguration();
Transport transport = Transport.NETTY_NIO;
@@ -117,14 +115,9 @@
if (config.testca && config.address instanceof InetSocketAddress) {
// Override the socket address with the host from the testca.
- try {
- InetSocketAddress prevAddress = (InetSocketAddress) config.address;
- InetAddress inetAddress = InetAddress.getByName(prevAddress.getHostName());
- inetAddress = InetAddress.getByAddress(TESTCA_HOST, inetAddress.getAddress());
- config.address = new InetSocketAddress(inetAddress, prevAddress.getPort());
- } catch (UnknownHostException e) {
- throw new RuntimeException(e);
- }
+ InetSocketAddress address = (InetSocketAddress) config.address;
+ config.address = TestUtils.testServerAddress(address.getHostName(),
+ address.getPort());
}
}
diff --git a/build.gradle b/build.gradle
index d83b400..b2387ee 100644
--- a/build.gradle
+++ b/build.gradle
@@ -252,6 +252,9 @@
test {
testLogging {
exceptionFormat = 'full'
+ showExceptions true
+ showCauses true
+ showStackTraces true
}
}
}
diff --git a/interop-testing/build.gradle b/interop-testing/build.gradle
index e4e3652..eb6f399 100644
--- a/interop-testing/build.gradle
+++ b/interop-testing/build.gradle
@@ -28,12 +28,6 @@
test {
jvmArgs "-Xbootclasspath/p:" + configurations.alpnboot.asPath
- testLogging {
- exceptionFormat "full"
- showExceptions true
- showCauses true
- showStackTraces true
- }
}
task test_client(type: CreateStartScripts) {
diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java
index 05643b9..504c815 100644
--- a/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java
+++ b/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java
@@ -43,8 +43,6 @@
import org.junit.runners.JUnit4;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
/**
* Integration tests for GRPC over HTTP2 using the Netty framework.
@@ -73,10 +71,8 @@
@Override
protected ChannelImpl createChannel() {
try {
- InetAddress address
- = InetAddress.getByAddress("foo.test.google.fr", new byte[] {127, 0, 0, 1});
return NettyChannelBuilder
- .forAddress(new InetSocketAddress(address, serverPort))
+ .forAddress(TestUtils.testServerAddress(serverPort))
.sslContext(GrpcSslContexts.forClient().trustManager(
TestUtils.loadCert("ca.pem")).build())
.build();
diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java
index b0885fc..b2b8b71 100644
--- a/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java
+++ b/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java
@@ -71,7 +71,7 @@
@Override
protected ChannelImpl createChannel() {
OkHttpChannelBuilder builder = OkHttpChannelBuilder.forAddress("127.0.0.1", serverPort)
- .overrideHostForAuthority("foo.test.google.fr");
+ .overrideHostForAuthority(TestUtils.TEST_SERVER_HOST);
try {
builder.sslSocketFactory(TestUtils.getSslSocketFactoryForCertainCert(
TestUtils.loadCert("ca.pem")));
diff --git a/netty/build.gradle b/netty/build.gradle
index ddf5a63..548edf0 100644
--- a/netty/build.gradle
+++ b/netty/build.gradle
@@ -5,7 +5,12 @@
libraries.netty
// Tests depend on base class defined by core module.
- testCompile project(':grpc-core').sourceSets.test.output
+ testCompile project(':grpc-core').sourceSets.test.output,
+ project(':grpc-testing')
+}
+
+test {
+ jvmArgs "-Xbootclasspath/p:" + configurations.alpnboot.asPath
}
javadoc.options.links 'https://netty.io/4.1/api/'
diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java
index f485284..2ba4f11 100644
--- a/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java
+++ b/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java
@@ -37,9 +37,11 @@
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
+import io.grpc.Status;
import io.grpc.transport.ClientStream;
import io.grpc.transport.ClientStreamListener;
import io.grpc.transport.ClientTransport;
+
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@@ -123,16 +125,27 @@
Preconditions.checkNotNull(listener, "listener");
// Create the stream.
- NettyClientStream stream = new NettyClientStream(listener, channel, handler);
+ final NettyClientStream stream = new NettyClientStream(listener, channel, handler);
// Convert the headers into Netty HTTP/2 headers.
AsciiString defaultPath = new AsciiString("/" + method.getName());
Http2Headers http2Headers = Utils.convertClientHeaders(headers, negotiationHandler.scheme(),
defaultPath, authority);
+ ChannelFutureListener failureListener = new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ // Stream creation failed. Close the stream if not already closed.
+ stream.transportReportStatus(Status.fromThrowable(future.cause()), true,
+ new Metadata.Trailers());
+ }
+ }
+ };
+
// Write the command requesting the creation of the stream.
handler.getWriteQueue().enqueue(new CreateStreamCommand(http2Headers, stream),
- !method.getType().clientSendsOneMessage());
+ !method.getType().clientSendsOneMessage()).addListener(failureListener);
return stream;
}
@@ -159,19 +172,7 @@
channel.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- // The close failed. Just notify that transport shutdown failed.
- notifyTerminated(future.cause());
- return;
- }
-
- if (handler.connectionError() != null) {
- // The handler encountered a connection error.
- notifyTerminated(handler.connectionError());
- } else {
- // Normal termination of the connection.
- notifyTerminated(null);
- }
+ notifyTerminated(handler.connectionError());
}
});
}
diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/transport/netty/NettyServerTransport.java
index 2ee9ad7..d08b697 100644
--- a/netty/src/main/java/io/grpc/transport/netty/NettyServerTransport.java
+++ b/netty/src/main/java/io/grpc/transport/netty/NettyServerTransport.java
@@ -90,15 +90,7 @@
channel.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- notifyTerminated(future.cause());
- } else if (handler.connectionError() != null) {
- // The handler encountered a connection error.
- notifyTerminated(handler.connectionError());
- } else {
- // Normal termination of the connection.
- notifyTerminated(null);
- }
+ notifyTerminated(handler.connectionError());
}
});
diff --git a/netty/src/main/java/io/grpc/transport/netty/ProtocolNegotiators.java b/netty/src/main/java/io/grpc/transport/netty/ProtocolNegotiators.java
index 862d290..055d615 100644
--- a/netty/src/main/java/io/grpc/transport/netty/ProtocolNegotiators.java
+++ b/netty/src/main/java/io/grpc/transport/netty/ProtocolNegotiators.java
@@ -33,6 +33,8 @@
import com.google.common.base.Preconditions;
+import io.grpc.Status;
+
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
@@ -56,6 +58,8 @@
import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.Queue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
@@ -64,6 +68,7 @@
* Common {@link ProtocolNegotiator}s used by gRPC.
*/
public final class ProtocolNegotiators {
+ private static final Logger log = Logger.getLogger(ProtocolNegotiators.class.getName());
private ProtocolNegotiators() {
}
@@ -86,32 +91,31 @@
Preconditions.checkNotNull(sslContext, "sslContext");
Preconditions.checkNotNull(inetAddress, "inetAddress");
- final ChannelHandler sslBootstrapHandler = new ChannelHandlerAdapter() {
+ return new ProtocolNegotiator() {
@Override
- public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
- // TODO(nmittler): This method is currently unsupported for OpenSSL. Need to fix in Netty.
- SSLEngine sslEngine = sslContext.newEngine(ctx.alloc(),
- inetAddress.getHostName(), inetAddress.getPort());
- SSLParameters sslParams = new SSLParameters();
- sslParams.setEndpointIdentificationAlgorithm("HTTPS");
- sslEngine.setSSLParameters(sslParams);
+ public Handler newHandler(Http2ConnectionHandler handler) {
+ ChannelHandler sslBootstrap = new ChannelHandlerAdapter() {
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ // TODO(nmittler): Unsupported for OpenSSL in Netty < 4.1.Beta6.
+ SSLEngine sslEngine = sslContext.newEngine(ctx.alloc(),
+ inetAddress.getHostName(), inetAddress.getPort());
+ SSLParameters sslParams = new SSLParameters();
+ sslParams.setEndpointIdentificationAlgorithm("HTTPS");
+ sslEngine.setSSLParameters(sslParams);
- SslHandler sslHandler = new SslHandler(sslEngine, false);
- sslHandler.handshakeFuture().addListener(
- new GenericFutureListener<Future<? super Channel>>() {
+ SslHandler sslHandler = new SslHandler(sslEngine, false);
+ sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
@Override
- public void operationComplete(Future<? super Channel> future) throws Exception {
+ public void operationComplete(Future<Channel> future) throws Exception {
// If an error occurred during the handshake, throw it to the pipeline.
future.get();
}
});
- ctx.pipeline().replace(this, "sslHandler", sslHandler);
- }
- };
- return new ProtocolNegotiator() {
- @Override
- public Handler newHandler(Http2ConnectionHandler handler) {
- return new BufferUntilTlsNegotiatedHandler(sslBootstrapHandler, handler);
+ ctx.pipeline().replace(this, null, sslHandler);
+ }
+ };
+ return new BufferUntilTlsNegotiatedHandler(sslBootstrap, handler);
}
};
}
@@ -146,9 +150,13 @@
};
}
+ private static RuntimeException unavailableException(String msg) {
+ return Status.UNAVAILABLE.withDescription(msg).asRuntimeException();
+ }
+
/**
* Buffers all writes until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} or
- * {@link #failBufferedAndClose(ChannelHandlerContext)} is called. This handler allows us to
+ * {@link #fail(ChannelHandlerContext, Throwable)} is called. This handler allows us to
* write to a {@link Channel} before we are allowed to write to it officially i.e.
* before it's active or the TLS Handshake is complete.
*/
@@ -181,8 +189,13 @@
}
@Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ fail(ctx, cause);
+ }
+
+ @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- failBufferedAndClose(ctx);
+ fail(ctx, unavailableException("Connection broken while performing protocol negotiation"));
super.channelInactive(ctx);
}
@@ -224,18 +237,20 @@
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
- failBufferedAndClose(ctx);
+ fail(ctx, unavailableException("Channel closed while performing protocol negotiation"));
}
- protected void failBufferedAndClose(ChannelHandlerContext ctx) {
+ protected final void fail(ChannelHandlerContext ctx, Throwable cause) {
if (bufferedWrites != null) {
- Exception e = new Exception("Buffered write failed.");
while (!bufferedWrites.isEmpty()) {
ChannelWrite write = bufferedWrites.poll();
- write.promise.setFailure(e);
+ write.promise.setFailure(cause);
}
bufferedWrites = null;
}
+
+ log.log(Level.SEVERE, "Transport failed during protocol negotiation", cause);
+
/**
* In case something goes wrong ensure that the channel gets closed as the
* NettyClientTransport relies on the channel's close future to get completed.
@@ -243,7 +258,7 @@
ctx.close();
}
- protected void writeBufferedAndRemove(ChannelHandlerContext ctx) {
+ protected final void writeBufferedAndRemove(ChannelHandlerContext ctx) {
if (!ctx.channel().isActive() || writing) {
return;
}
@@ -298,7 +313,7 @@
if (handshakeEvent.isSuccess()) {
writeBufferedAndRemove(ctx);
} else {
- failBufferedAndClose(ctx);
+ fail(ctx, handshakeEvent.cause());
}
}
super.userEventTriggered(ctx, evt);
@@ -362,8 +377,7 @@
if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_SUCCESSFUL) {
writeBufferedAndRemove(ctx);
} else if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_REJECTED) {
- failBufferedAndClose(ctx);
- ctx.pipeline().fireExceptionCaught(new Exception("HTTP/2 upgrade rejected"));
+ fail(ctx, unavailableException("HTTP/2 upgrade rejected"));
}
super.userEventTriggered(ctx, evt);
}
diff --git a/netty/src/main/java/io/grpc/transport/netty/WriteQueue.java b/netty/src/main/java/io/grpc/transport/netty/WriteQueue.java
index 0e8b2c3..0059222 100644
--- a/netty/src/main/java/io/grpc/transport/netty/WriteQueue.java
+++ b/netty/src/main/java/io/grpc/transport/netty/WriteQueue.java
@@ -34,6 +34,7 @@
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import java.util.ArrayDeque;
@@ -92,8 +93,8 @@
* @param flush true if a flush of the write should be schedule, false if a later call to
* enqueue will schedule the flush.
*/
- void enqueue(Object command, boolean flush) {
- enqueue(command, channel.newPromise(), flush);
+ ChannelFuture enqueue(Object command, boolean flush) {
+ return enqueue(command, channel.newPromise(), flush);
}
/**
@@ -104,11 +105,12 @@
* @param flush true if a flush of the write should be schedule, false if a later call to
* enqueue will schedule the flush.
*/
- void enqueue(Object command, ChannelPromise promise, boolean flush) {
+ ChannelFuture enqueue(Object command, ChannelPromise promise, boolean flush) {
queue.add(new QueuedCommand(command, promise));
if (flush) {
scheduleFlush();
}
+ return promise;
}
/**
diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java b/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java
index ca8ec81..32fd2ec 100644
--- a/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java
+++ b/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java
@@ -44,7 +44,6 @@
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
@@ -54,6 +53,7 @@
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.transport.ClientStreamListener;
+
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelPromise;
@@ -328,7 +328,7 @@
return null;
}
}).when(writeQueue).enqueue(any(), any(ChannelPromise.class), anyBoolean());
- doNothing().when(writeQueue).enqueue(any(), anyBoolean());
+ when(writeQueue.enqueue(any(), anyBoolean())).thenReturn(future);
NettyClientStream stream = new NettyClientStream(listener, channel, handler);
assertTrue(stream.canSend());
assertTrue(stream.canReceive());
diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/transport/netty/NettyClientTransportTest.java
new file mode 100644
index 0000000..f4e504d
--- /dev/null
+++ b/netty/src/test/java/io/grpc/transport/netty/NettyClientTransportTest.java
@@ -0,0 +1,255 @@
+/*
+ * Copyright 2014, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc.transport.netty;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.util.concurrent.SettableFuture;
+
+import io.grpc.Marshaller;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.MethodType;
+import io.grpc.Status;
+import io.grpc.testing.TestUtils;
+import io.grpc.transport.ClientStream;
+import io.grpc.transport.ClientStreamListener;
+import io.grpc.transport.ClientTransport;
+import io.grpc.transport.ServerListener;
+import io.grpc.transport.ServerStream;
+import io.grpc.transport.ServerStreamListener;
+import io.grpc.transport.ServerTransport;
+import io.grpc.transport.ServerTransportListener;
+
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.ssl.SslContext;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests for {@link NettyClientTransport}.
+ */
+@RunWith(JUnit4.class)
+public class NettyClientTransportTest {
+ private static final String MESSAGE = "hello";
+
+ @Mock
+ private ClientTransport.Listener clientTransportListener;
+
+ private final List<NettyClientTransport> transports = new ArrayList<NettyClientTransport>();
+ private NioEventLoopGroup group;
+ private InetSocketAddress address;
+ private NettyServer server;
+
+ @Before
+ public void setup() throws Exception {
+ MockitoAnnotations.initMocks(this);
+
+ group = new NioEventLoopGroup(1);
+
+ // Start the server.
+ address = TestUtils.testServerAddress(TestUtils.pickUnusedPort());
+ File serverCert = TestUtils.loadCert("server1.pem");
+ File key = TestUtils.loadCert("server1.key");
+ SslContext serverContext = GrpcSslContexts.forServer(serverCert, key).build();
+ server = new NettyServer(address, NioServerSocketChannel.class,
+ group, group, serverContext, 100, DEFAULT_WINDOW_SIZE, DEFAULT_WINDOW_SIZE);
+ server.start(new TestServerListener());
+ }
+
+ @After
+ public void teardown() throws Exception {
+ for (NettyClientTransport transport : transports) {
+ transport.shutdown();
+ }
+
+ if (server != null) {
+ server.shutdown();
+ }
+
+ group.shutdownGracefully(0, 10, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Verifies that we can create multiple TLS client transports from the same builder.
+ */
+ @Test
+ public void creatingMultipleTlsTransportsShouldSucceed() throws Exception {
+ // Create the protocol negotiator.
+ File clientCert = TestUtils.loadCert("ca.pem");
+ SslContext clientContext = GrpcSslContexts.forClient().trustManager(clientCert).build();
+ ProtocolNegotiator negotiator = ProtocolNegotiators.tls(clientContext, address);
+
+ // Create a couple client transports.
+ for (int index = 0; index < 2; ++index) {
+ NettyClientTransport transport = new NettyClientTransport(address, NioSocketChannel.class,
+ group, negotiator, DEFAULT_WINDOW_SIZE, DEFAULT_WINDOW_SIZE);
+ transports.add(transport);
+ transport.start(clientTransportListener);
+ }
+
+ // Send a single RPC on each transport.
+ final List<SettableFuture> rpcFutures = new ArrayList<SettableFuture>(transports.size());
+ MethodDescriptor<String, String> method = MethodDescriptor.create(MethodType.UNARY,
+ "/testService/test", 10, TimeUnit.SECONDS, StringMarshaller.INSTANCE,
+ StringMarshaller.INSTANCE);
+ for (NettyClientTransport transport : transports) {
+ SettableFuture rpcFuture = SettableFuture.create();
+ rpcFutures.add(rpcFuture);
+ ClientStream stream = transport.newStream(method, new Metadata.Headers(),
+ new TestClientStreamListener(rpcFuture));
+ stream.request(1);
+ stream.writeMessage(messageStream());
+ stream.halfClose();
+ }
+
+ // Wait for the RPCs to complete.
+ for (SettableFuture rpcFuture : rpcFutures) {
+ rpcFuture.get(10, TimeUnit.SECONDS);
+ }
+ }
+
+ private static InputStream messageStream() {
+ return new ByteArrayInputStream(MESSAGE.getBytes());
+ }
+
+ private static class TestClientStreamListener implements ClientStreamListener {
+ private final SettableFuture<?> future;
+
+ TestClientStreamListener(SettableFuture<?> future) {
+ this.future = future;
+ }
+
+ @Override
+ public void headersRead(Metadata.Headers headers) {
+ }
+
+ @Override
+ public void closed(Status status, Metadata.Trailers trailers) {
+ if (status.isOk()) {
+ future.set(null);
+ } else {
+ future.setException(status.asException());
+ }
+ }
+
+ @Override
+ public void messageRead(InputStream message) {
+ }
+
+ @Override
+ public void onReady() {
+ }
+ }
+
+ private static class TestServerListener implements ServerListener {
+
+ @Override
+ public ServerTransportListener transportCreated(final ServerTransport transport) {
+ return new ServerTransportListener() {
+
+ @Override
+ public ServerStreamListener streamCreated(final ServerStream stream, String method,
+ Metadata.Headers headers) {
+ stream.request(1);
+ return new ServerStreamListener() {
+
+ @Override
+ public void messageRead(InputStream message) {
+ // Just echo back the message.
+ stream.writeMessage(messageStream());
+ }
+
+ @Override
+ public void onReady() {
+ }
+
+ @Override
+ public void halfClosed() {
+ // Just close when the client closes.
+ stream.close(Status.OK, new Metadata.Trailers());
+ }
+
+ @Override
+ public void closed(Status status) {
+ }
+ };
+ }
+
+ @Override
+ public void transportTerminated() {
+ }
+ };
+ }
+
+ @Override
+ public void serverShutdown() {
+ }
+ }
+
+ private static class StringMarshaller implements Marshaller<String> {
+ static final StringMarshaller INSTANCE = new StringMarshaller();
+
+ @Override
+ public InputStream stream(String value) {
+ return new ByteArrayInputStream(value.getBytes(UTF_8));
+ }
+
+ @Override
+ public String parse(InputStream stream) {
+ try {
+ return new String(ByteStreams.toByteArray(stream), UTF_8);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+}
diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyServerStreamTest.java b/netty/src/test/java/io/grpc/transport/netty/NettyServerStreamTest.java
index 61997ad..b5ec169 100644
--- a/netty/src/test/java/io/grpc/transport/netty/NettyServerStreamTest.java
+++ b/netty/src/test/java/io/grpc/transport/netty/NettyServerStreamTest.java
@@ -40,7 +40,6 @@
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
@@ -51,6 +50,7 @@
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.transport.ServerStreamListener;
+
import io.netty.buffer.EmptyByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelPromise;
@@ -246,7 +246,7 @@
return null;
}
}).when(writeQueue).enqueue(any(), any(ChannelPromise.class), anyBoolean());
- doNothing().when(writeQueue).enqueue(any(), anyBoolean());
+ when(writeQueue.enqueue(any(), anyBoolean())).thenReturn(future);
NettyServerStream stream = new NettyServerStream(channel, http2Stream, handler);
stream.setListener(serverListener);
assertTrue(stream.canReceive());
diff --git a/testing/src/main/java/io/grpc/testing/TestUtils.java b/testing/src/main/java/io/grpc/testing/TestUtils.java
index 6d305ed..9efa5ee 100644
--- a/testing/src/main/java/io/grpc/testing/TestUtils.java
+++ b/testing/src/main/java/io/grpc/testing/TestUtils.java
@@ -45,7 +45,10 @@
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.net.ServerSocket;
+import java.net.UnknownHostException;
import java.security.KeyStore;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
@@ -62,6 +65,7 @@
* Common utility functions useful for writing tests.
*/
public class TestUtils {
+ public static final String TEST_SERVER_HOST = "foo.test.google.fr";
/**
* Echo the request headers from a client into response headers and trailers. Useful for
@@ -120,6 +124,32 @@
}
/**
+ * Creates a new {@link InetSocketAddress} that overrides the host with {@link #TEST_SERVER_HOST}.
+ */
+ public static InetSocketAddress testServerAddress(String host, int port) {
+ try {
+ InetAddress inetAddress = InetAddress.getByName(host);
+ inetAddress = InetAddress.getByAddress(TEST_SERVER_HOST, inetAddress.getAddress());
+ return new InetSocketAddress(inetAddress, port);
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Creates a new {@link InetSocketAddress} on localhost that overrides the host with
+ * {@link #TEST_SERVER_HOST}.
+ */
+ public static InetSocketAddress testServerAddress(int port) {
+ try {
+ InetAddress inetAddress = InetAddress.getByName("localhost");
+ inetAddress = InetAddress.getByAddress(TEST_SERVER_HOST, inetAddress.getAddress());
+ return new InetSocketAddress(inetAddress, port);
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ /**
* Load a file from the resources folder.
*
* @param name name of a file in src/main/resources/certs.