core: fix a discrepency in state transition.
Channel state API doesn't allow a TRANSIENT_FAILURE->IDLE edge.
Change TransportSet to always transition to CONNECTING after
TRANSIENT_FAILURE.
This behavior, combined with that it never uses IDLE_TIMEOUT to
transition from READY to IDLE, effectivly makes TransportSet
Channel-state API-compliant under an infinite IDLE_TIMEOUT.
Also set the default IDLE_TIMEOUT to 30min.
diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
index 3c28004..39d78b6 100644
--- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
@@ -78,6 +78,12 @@
static final long IDLE_MODE_MAX_TIMEOUT_DAYS = 30;
/**
+ * The default idle timeout.
+ */
+ @VisibleForTesting
+ static final long IDLE_MODE_DEFAULT_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(30);
+
+ /**
* An idle timeout smaller than this would be capped to it.
*/
@VisibleForTesting
@@ -110,7 +116,7 @@
@Nullable
private CompressorRegistry compressorRegistry;
- private long idleTimeoutMillis = ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE;
+ private long idleTimeoutMillis = IDLE_MODE_DEFAULT_TIMEOUT_MILLIS;
protected AbstractManagedChannelImplBuilder(String target) {
this.target = Preconditions.checkNotNull(target, "target");
diff --git a/core/src/main/java/io/grpc/internal/TransportSet.java b/core/src/main/java/io/grpc/internal/TransportSet.java
index 8397b53..bd8098e 100644
--- a/core/src/main/java/io/grpc/internal/TransportSet.java
+++ b/core/src/main/java/io/grpc/internal/TransportSet.java
@@ -252,42 +252,17 @@
public void run() {
try {
delayedTransport.endBackoff();
- boolean shutdownDelayedTransport = false;
Runnable runnable = null;
- // TransportSet as a channel layer class should not call into transport methods while
- // holding the lock, thus we call hasPendingStreams() outside of the lock. It will cause
- // a _benign_ race where the TransportSet may transition to CONNECTING when there is not
- // pending stream.
- boolean hasPendingStreams = delayedTransport.hasPendingStreams();
synchronized (lock) {
reconnectTask = null;
- if (hasPendingStreams) {
- if (!shutdown) {
- stateManager.gotoState(ConnectivityState.CONNECTING);
- }
- runnable = startNewTransport(delayedTransport);
- } else {
- if (!shutdown) {
- stateManager.gotoState(ConnectivityState.IDLE);
- }
- activeTransport = null;
- shutdownDelayedTransport = true;
+ if (!shutdown) {
+ stateManager.gotoState(ConnectivityState.CONNECTING);
}
+ runnable = startNewTransport(delayedTransport);
}
if (runnable != null) {
runnable.run();
}
- if (shutdownDelayedTransport) {
- delayedTransport.setTransportSupplier(new Supplier<ClientTransport>() {
- @Override
- public ClientTransport get() {
- // This will wrap one DelayedStream in another, but it only happens if we win a
- // race and can happen to a stream at most once.
- return obtainActiveTransport();
- }
- });
- delayedTransport.shutdown();
- }
} catch (Throwable t) {
log.log(Level.WARNING, "Exception handling end of backoff", t);
}
diff --git a/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java b/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java
index 4fba28f..f6d09db 100644
--- a/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java
+++ b/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java
@@ -75,7 +75,8 @@
Builder builder = new Builder();
- assertEquals(ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE, builder.getIdleTimeoutMillis());
+ assertEquals(AbstractManagedChannelImplBuilder.IDLE_MODE_DEFAULT_TIMEOUT_MILLIS,
+ builder.getIdleTimeoutMillis());
builder.idleTimeout(Long.MAX_VALUE, TimeUnit.DAYS);
assertEquals(ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE, builder.getIdleTimeoutMillis());
diff --git a/core/src/test/java/io/grpc/internal/TransportSetTest.java b/core/src/test/java/io/grpc/internal/TransportSetTest.java
index a511ad2..24f2a9a 100644
--- a/core/src/test/java/io/grpc/internal/TransportSetTest.java
+++ b/core/src/test/java/io/grpc/internal/TransportSetTest.java
@@ -473,28 +473,21 @@
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(false));
- // Won't reconnect until requested, even if back-off time has expired
+ // Will always reconnect after back-off
fakeClock.forwardMillis(10);
- assertEquals(ConnectivityState.IDLE, transportSet.getState(false));
- verify(mockTransportFactory, times(transportsCreated))
- .newClientTransport(addr, authority, userAgent);
-
- // Once requested, will reconnect
- transportSet.obtainActiveTransport().newStream(method, new Metadata());
assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
verify(mockTransportFactory, times(++transportsCreated))
.newClientTransport(addr, authority, userAgent);
- // Fail this one, too
+ // Make this one proceed
+ transports.peek().listener.transportReady();
+ assertEquals(ConnectivityState.READY, transportSet.getState(false));
+ // Then go-away
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
- assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(false));
+ assertEquals(ConnectivityState.IDLE, transportSet.getState(false));
- // Request immediately, but will wait for back-off before reconnecting
+ // Request immediately
transportSet.obtainActiveTransport().newStream(method, new Metadata(), waitForReadyCallOptions);
- assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(false));
- verify(mockTransportFactory, times(transportsCreated))
- .newClientTransport(addr, authority, userAgent);
- fakeClock.forwardMillis(100);
assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
verify(mockTransportFactory, times(++transportsCreated))
.newClientTransport(addr, authority, userAgent);
@@ -502,40 +495,6 @@
}
@Test
- public void raceTransientFailureAndNewStream() {
- SocketAddress addr = mock(SocketAddress.class);
- createTransportSet(addr);
-
- // Invocation counters
- int transportsCreated = 0;
-
- // Trigger TRANSIENT_FAILURE
- transportSet.obtainActiveTransport().newStream(method, new Metadata());
- verify(mockTransportFactory, times(++transportsCreated))
- .newClientTransport(addr, authority, userAgent);
- transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
- assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(false));
-
- // Won't reconnect without any active streams
- ClientTransport transientFailureTransport = transportSet.obtainActiveTransport();
- assertTrue(transientFailureTransport instanceof DelayedClientTransport);
- transientFailureTransport.newStream(method, new Metadata()).cancel(Status.CANCELLED);
- assertEquals(ConnectivityState.TRANSIENT_FAILURE, transportSet.getState(false));
- fakeClock.forwardMillis(10);
- assertEquals(ConnectivityState.IDLE, transportSet.getState(false));
- verify(mockTransportFactory, times(transportsCreated))
- .newClientTransport(addr, authority, userAgent);
-
- // Lose race (long delay between obtainActiveTransport and newStream); will now reconnect
- transientFailureTransport.newStream(method, new Metadata());
- assertEquals(ConnectivityState.CONNECTING, transportSet.getState(false));
- verify(mockTransportFactory, times(++transportsCreated))
- .newClientTransport(addr, authority, userAgent);
-
- fakeExecutor.runDueTasks(); // Drain new 'real' stream creation; not important to this test.
- }
-
- @Test
public void shutdownBeforeTransportCreatedWithPendingStream() throws Exception {
SocketAddress addr = mock(SocketAddress.class);
createTransportSet(addr);