Trigger name resolution when a live connection closed.
In addition to when all addresses have failed to connect. This is to
cover the case where some addresses have changed in the name system
while some are still there and usable. Without this change, the client
would try to connect old addresses each time it reconnects.
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
index 0be3d78..0f37c92 100644
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
+++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
@@ -397,6 +397,11 @@
public void onAllAddressesFailed() {
nameResolver.refresh();
}
+
+ @Override
+ public void onConnectionClosedByServer(Status status) {
+ nameResolver.refresh();
+ }
});
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] {1} created for {2}",
diff --git a/core/src/main/java/io/grpc/internal/TransportSet.java b/core/src/main/java/io/grpc/internal/TransportSet.java
index 2922896..74d3dd2 100644
--- a/core/src/main/java/io/grpc/internal/TransportSet.java
+++ b/core/src/main/java/io/grpc/internal/TransportSet.java
@@ -348,6 +348,7 @@
@Override
public void transportShutdown(Status s) {
boolean allAddressesFailed = false;
+ boolean closedByServer = false;
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] {1} for {2} is being shutdown with status {3}",
new Object[] {getLogId(), transport.getLogId(), address, s});
@@ -355,7 +356,9 @@
super.transportShutdown(s);
synchronized (lock) {
if (activeTransport == transport) {
+ // This is true only if the transport was ready.
activeTransport = null;
+ closedByServer = !shutdown;
} else if (activeTransport == delayedTransport) {
// Continue reconnect if there are still addresses to try.
// Fail if all addresses have been tried and failed in a row.
@@ -373,6 +376,9 @@
if (allAddressesFailed) {
callback.onAllAddressesFailed();
}
+ if (closedByServer) {
+ callback.onConnectionClosedByServer(s);
+ }
}
@Override
@@ -389,8 +395,20 @@
}
interface Callback {
+ /**
+ * Called when the TransportSet is terminated, which means it's shut down and all transports
+ * have been terminated.
+ */
void onTerminated();
+ /**
+ * Called when all addresses have failed to connect.
+ */
void onAllAddressesFailed();
+
+ /**
+ * Called when a once-live connection is shut down by server-side.
+ */
+ void onConnectionClosedByServer(Status status);
}
}
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java
index 3295e99..e35c467 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java
@@ -194,8 +194,11 @@
// Make the second transport ready
transportInfo.listener.transportReady();
verify(rt2, timeout(1000)).newStream(same(method), any(Metadata.class));
+ verify(mockNameResolver, times(0)).refresh();
// Disconnect the second transport
transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
+ // Will trigger NameResolver refresh
+ verify(mockNameResolver).refresh();
// Subsequent getTransport() will use the next address, which is the first one since we have run
// out of addresses.
@@ -213,8 +216,8 @@
// Back-off policy was never consulted.
verify(mockBackoffPolicy, times(0)).nextBackoffMillis();
verifyNoMoreInteractions(mockTransportFactory);
- // Never refreshed NameResolver
- verify(mockNameResolver, times(0)).refresh();
+ // NameResolver was refreshed only once
+ verify(mockNameResolver).refresh();
}
@Test
@@ -238,10 +241,11 @@
// Back-off policy was set initially.
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
MockClientTransportInfo transportInfo = transports.poll(1, TimeUnit.SECONDS);
+ verify(mockNameResolver, times(nameResolverRefresh)).refresh();
transportInfo.listener.transportReady();
// Then close it
transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
- verify(mockNameResolver, times(nameResolverRefresh)).refresh();
+ verify(mockNameResolver, times(++nameResolverRefresh)).refresh();
// Second pick fails. This is the beginning of a series of failures.
ClientTransport t2 = tm.getTransport(addressGroup);
diff --git a/core/src/test/java/io/grpc/internal/TransportSetTest.java b/core/src/test/java/io/grpc/internal/TransportSetTest.java
index 988b03a..63f4696 100644
--- a/core/src/test/java/io/grpc/internal/TransportSetTest.java
+++ b/core/src/test/java/io/grpc/internal/TransportSetTest.java
@@ -37,6 +37,7 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.same;
@@ -152,8 +153,10 @@
verify(mockTransportFactory, times(++transportsCreated)).newClientTransport(addr, authority);
// Let this one succeed
transports.peek().listener.transportReady();
+ verify(mockTransportSetCallback, never()).onConnectionClosedByServer(any(Status.class));
// And close it
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
+ verify(mockTransportSetCallback).onConnectionClosedByServer(same(Status.UNAVAILABLE));
verify(mockTransportSetCallback, times(onAllAddressesFailed)).onAllAddressesFailed();
// Back-off is reset, and the next attempt will happen immediately
@@ -164,6 +167,7 @@
// Final checks for consultations on back-off policies
verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffMillis();
verify(mockBackoffPolicy2, times(backoff2Consulted)).nextBackoffMillis();
+ verifyNoMoreInteractions(mockTransportSetCallback);
}
@Test public void twoAddressesReconnect() {
@@ -243,8 +247,10 @@
transports.peek().listener.transportReady();
// Delayed transport will see the connected transport.
assertSame(transports.peek().transport, delayedTransport5.getTransportSupplier().get());
+ verify(mockTransportSetCallback, never()).onConnectionClosedByServer(any(Status.class));
// Then close it.
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
+ verify(mockTransportSetCallback).onConnectionClosedByServer(same(Status.UNAVAILABLE));
verify(mockTransportSetCallback, times(onAllAddressesFailed)).onAllAddressesFailed();
// First attempt after a successful connection. Reset back-off policy, and start from the first
@@ -259,6 +265,7 @@
verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffMillis();
verify(mockBackoffPolicy2, times(backoff2Consulted)).nextBackoffMillis();
verify(mockBackoffPolicy3, times(backoff3Consulted)).nextBackoffMillis();
+ verifyNoMoreInteractions(mockTransportSetCallback);
}
@Test