core: emit lists of lists from NameResolver
diff --git a/core/src/main/java/io/grpc/DnsNameResolver.java b/core/src/main/java/io/grpc/DnsNameResolver.java
index ae5fa04..8d5b211 100644
--- a/core/src/main/java/io/grpc/DnsNameResolver.java
+++ b/core/src/main/java/io/grpc/DnsNameResolver.java
@@ -42,6 +42,8 @@
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -154,14 +156,15 @@
savedListener.onError(Status.UNAVAILABLE.withCause(e));
return;
}
- ArrayList<ResolvedServerInfo> servers =
+ List<ResolvedServerInfo> servers =
new ArrayList<ResolvedServerInfo>(inetAddrs.length);
for (int i = 0; i < inetAddrs.length; i++) {
InetAddress inetAddr = inetAddrs[i];
servers.add(
new ResolvedServerInfo(new InetSocketAddress(inetAddr, port), Attributes.EMPTY));
}
- savedListener.onUpdate(servers, Attributes.EMPTY);
+ savedListener.onUpdate(
+ Collections.singletonList(servers), Attributes.EMPTY);
} finally {
synchronized (DnsNameResolver.this) {
resolving = false;
diff --git a/core/src/main/java/io/grpc/SimpleLoadBalancerFactory.java b/core/src/main/java/io/grpc/DummyLoadBalancerFactory.java
similarity index 81%
rename from core/src/main/java/io/grpc/SimpleLoadBalancerFactory.java
rename to core/src/main/java/io/grpc/DummyLoadBalancerFactory.java
index ebeff49..a8bc19b 100644
--- a/core/src/main/java/io/grpc/SimpleLoadBalancerFactory.java
+++ b/core/src/main/java/io/grpc/DummyLoadBalancerFactory.java
@@ -42,30 +42,30 @@
import javax.annotation.concurrent.GuardedBy;
/**
- * A {@link LoadBalancer} that provides simple round-robin and pick-first routing mechanism over the
- * addresses from the {@link NameResolver}.
+ * A {@link LoadBalancer} that provides no load balancing mechanism over the
+ * addresses from the {@link NameResolver}. The channel's default behavior
+ * (currently pick-first) is used for all addresses found.
*/
-// TODO(zhangkun83): Only pick-first is implemented. We need to implement round-robin.
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771")
-public final class SimpleLoadBalancerFactory extends LoadBalancer.Factory {
+public final class DummyLoadBalancerFactory extends LoadBalancer.Factory {
- private static final SimpleLoadBalancerFactory instance = new SimpleLoadBalancerFactory();
+ private static final DummyLoadBalancerFactory instance = new DummyLoadBalancerFactory();
- private SimpleLoadBalancerFactory() {
+ private DummyLoadBalancerFactory() {
}
- public static SimpleLoadBalancerFactory getInstance() {
+ public static DummyLoadBalancerFactory getInstance() {
return instance;
}
@Override
public <T> LoadBalancer<T> newLoadBalancer(String serviceName, TransportManager<T> tm) {
- return new SimpleLoadBalancer<T>(tm);
+ return new DummyLoadBalancer<T>(tm);
}
- private static class SimpleLoadBalancer<T> extends LoadBalancer<T> {
+ private static class DummyLoadBalancer<T> extends LoadBalancer<T> {
private static final Status SHUTDOWN_STATUS =
- Status.UNAVAILABLE.augmentDescription("SimpleLoadBalancer has shut down");
+ Status.UNAVAILABLE.augmentDescription("DummyLoadBalancer has shut down");
private final Object lock = new Object();
@@ -80,7 +80,7 @@
private final TransportManager<T> tm;
- private SimpleLoadBalancer(TransportManager<T> tm) {
+ private DummyLoadBalancer(TransportManager<T> tm) {
this.tm = tm;
}
@@ -107,17 +107,18 @@
@Override
public void handleResolvedAddresses(
- List<ResolvedServerInfo> updatedServers, Attributes config) {
+ List<? extends List<ResolvedServerInfo>> updatedServers, Attributes config) {
InterimTransport<T> savedInterimTransport;
final EquivalentAddressGroup newAddresses;
synchronized (lock) {
if (closed) {
return;
}
- ArrayList<SocketAddress> newAddressList =
- new ArrayList<SocketAddress>(updatedServers.size());
- for (ResolvedServerInfo server : updatedServers) {
- newAddressList.add(server.getAddress());
+ ArrayList<SocketAddress> newAddressList = new ArrayList<SocketAddress>();
+ for (List<ResolvedServerInfo> servers : updatedServers) {
+ for (ResolvedServerInfo server : servers) {
+ newAddressList.add(server.getAddress());
+ }
}
newAddresses = new EquivalentAddressGroup(newAddressList);
if (newAddresses.equals(addresses)) {
diff --git a/core/src/main/java/io/grpc/LoadBalancer.java b/core/src/main/java/io/grpc/LoadBalancer.java
index 12b720e..e0a8447 100644
--- a/core/src/main/java/io/grpc/LoadBalancer.java
+++ b/core/src/main/java/io/grpc/LoadBalancer.java
@@ -66,14 +66,17 @@
public void shutdown() { }
/**
- * Handles newly resolved addresses and service config from name resolution system.
+ * Handles newly resolved addresses and service config from name resolution system. Sublists
+ * should be considered equivalent with an {@link EquivalentAddressGroup}, but may be flattened
+ * into a single list if needed.
*
* <p>Implementations should not modify the given {@code servers}.
*
- * @param servers the resolved server addresses. Never empty.
+ * @param servers the resolved server addresses, never empty.
* @param config extra configuration data from naming system.
*/
- public void handleResolvedAddresses(List<ResolvedServerInfo> servers, Attributes config) { }
+ public void handleResolvedAddresses(List<? extends List<ResolvedServerInfo>> servers,
+ Attributes config) { }
/**
* Handles an error from the name resolution system.
diff --git a/core/src/main/java/io/grpc/ManagedChannelBuilder.java b/core/src/main/java/io/grpc/ManagedChannelBuilder.java
index 93bce2f..bae1cae 100644
--- a/core/src/main/java/io/grpc/ManagedChannelBuilder.java
+++ b/core/src/main/java/io/grpc/ManagedChannelBuilder.java
@@ -157,8 +157,8 @@
/**
* Provides a custom {@link LoadBalancer.Factory} for the channel.
*
- * <p>If this method is not called, the builder will use {@link SimpleLoadBalancerFactory} for the
- * channel.
+ * <p>If this method is not called, the builder will use {@link DummyLoadBalancerFactory}
+ * for the channel.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771")
public abstract T loadBalancerFactory(LoadBalancer.Factory loadBalancerFactory);
diff --git a/core/src/main/java/io/grpc/NameResolver.java b/core/src/main/java/io/grpc/NameResolver.java
index 2699fde..0189c52 100644
--- a/core/src/main/java/io/grpc/NameResolver.java
+++ b/core/src/main/java/io/grpc/NameResolver.java
@@ -120,10 +120,12 @@
*
* <p>Implementations will not modify the given {@code servers}.
*
- * @param servers the resolved server addresses. An empty list will trigger {@link #onError}
+ * @param servers the resolved server addresses. Sublists should be considered to be
+ * an {@link EquivalentAddressGroup}. An empty list or all sublists being empty
+ * will trigger {@link #onError}
* @param config extra configuration data from naming system
*/
- void onUpdate(List<ResolvedServerInfo> servers, Attributes config);
+ void onUpdate(List<? extends List<ResolvedServerInfo>> servers, Attributes config);
/**
* Handles an error from the resolver.
diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
index 4cb74e9..32dbd95 100644
--- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
@@ -41,12 +41,12 @@
import io.grpc.ClientInterceptor;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
+import io.grpc.DummyLoadBalancerFactory;
import io.grpc.LoadBalancer;
import io.grpc.ManagedChannelBuilder;
import io.grpc.NameResolver;
import io.grpc.NameResolverRegistry;
import io.grpc.ResolvedServerInfo;
-import io.grpc.SimpleLoadBalancerFactory;
import java.net.SocketAddress;
import java.net.URI;
@@ -213,7 +213,7 @@
new ExponentialBackoffPolicy.Provider(),
firstNonNull(nameResolverFactory, NameResolverRegistry.getDefaultRegistry()),
getNameResolverParams(),
- firstNonNull(loadBalancerFactory, SimpleLoadBalancerFactory.getInstance()),
+ firstNonNull(loadBalancerFactory, DummyLoadBalancerFactory.getInstance()),
transportFactory,
firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()),
firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()),
@@ -279,7 +279,8 @@
@Override
public void start(final Listener listener) {
listener.onUpdate(
- Collections.singletonList(new ResolvedServerInfo(address, Attributes.EMPTY)),
+ Collections.singletonList(
+ Collections.singletonList(new ResolvedServerInfo(address, Attributes.EMPTY))),
Attributes.EMPTY);
}
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
index 3618565..d9d97fc 100644
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
+++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
@@ -175,8 +175,8 @@
this.nameResolver.start(new NameResolver.Listener() {
@Override
- public void onUpdate(List<ResolvedServerInfo> servers, Attributes config) {
- if (servers.isEmpty()) {
+ public void onUpdate(List<? extends List<ResolvedServerInfo>> servers, Attributes config) {
+ if (serversAreEmpty(servers)) {
onError(Status.UNAVAILABLE.withDescription("NameResolver returned an empty list"));
} else {
try {
@@ -201,6 +201,16 @@
}
}
+ private static boolean serversAreEmpty(List<? extends List<ResolvedServerInfo>> servers) {
+ for (List<ResolvedServerInfo> serverInfos : servers) {
+ if (!serverInfos.isEmpty()) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
@VisibleForTesting
static NameResolver getNameResolver(String target, NameResolver.Factory nameResolverFactory,
Attributes nameResolverParams) {
diff --git a/core/src/test/java/io/grpc/DnsNameResolverTest.java b/core/src/test/java/io/grpc/DnsNameResolverTest.java
index f9c2f31..3a3b7d9 100644
--- a/core/src/test/java/io/grpc/DnsNameResolverTest.java
+++ b/core/src/test/java/io/grpc/DnsNameResolverTest.java
@@ -40,6 +40,8 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
+import com.google.common.collect.Iterables;
+
import io.grpc.internal.FakeClock;
import io.grpc.internal.SharedResourceHolder.Resource;
@@ -102,7 +104,7 @@
@Mock
private NameResolver.Listener mockListener;
@Captor
- private ArgumentCaptor<List<ResolvedServerInfo>> resultCaptor;
+ private ArgumentCaptor<List<List<ResolvedServerInfo>>> resultCaptor;
@Captor
private ArgumentCaptor<Status> statusCaptor;
@@ -149,14 +151,14 @@
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onUpdate(resultCaptor.capture(), any(Attributes.class));
assertEquals(name, resolver.invocations.poll());
- assertAnswerMatches(answer1, 81, resultCaptor.getValue());
+ assertAnswerMatches(answer1, 81, Iterables.getOnlyElement(resultCaptor.getValue()));
assertEquals(0, fakeClock.numPendingTasks());
resolver.refresh();
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener, times(2)).onUpdate(resultCaptor.capture(), any(Attributes.class));
assertEquals(name, resolver.invocations.poll());
- assertAnswerMatches(answer2, 81, resultCaptor.getValue());
+ assertAnswerMatches(answer2, 81, Iterables.getOnlyElement(resultCaptor.getValue()));
assertEquals(0, fakeClock.numPendingTasks());
resolver.shutdown();
@@ -201,7 +203,7 @@
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onUpdate(resultCaptor.capture(), any(Attributes.class));
assertEquals(name, resolver.invocations.poll());
- assertAnswerMatches(answer, 81, resultCaptor.getValue());
+ assertAnswerMatches(answer, 81, Iterables.getOnlyElement(resultCaptor.getValue()));
verifyNoMoreInteractions(mockListener);
}
@@ -229,7 +231,7 @@
assertEquals(0, fakeClock.numPendingTasks());
verify(mockListener).onUpdate(resultCaptor.capture(), any(Attributes.class));
assertEquals(name, resolver.invocations.poll());
- assertAnswerMatches(answer, 81, resultCaptor.getValue());
+ assertAnswerMatches(answer, 81, Iterables.getOnlyElement(resultCaptor.getValue()));
verifyNoMoreInteractions(mockListener);
}
diff --git a/core/src/test/java/io/grpc/SimpleLoadBalancerTest.java b/core/src/test/java/io/grpc/DummyLoadBalancerTest.java
similarity index 92%
rename from core/src/test/java/io/grpc/SimpleLoadBalancerTest.java
rename to core/src/test/java/io/grpc/DummyLoadBalancerTest.java
index a8e4100..ad4bd09 100644
--- a/core/src/test/java/io/grpc/SimpleLoadBalancerTest.java
+++ b/core/src/test/java/io/grpc/DummyLoadBalancerTest.java
@@ -55,13 +55,14 @@
import java.net.SocketAddress;
import java.util.ArrayList;
+import java.util.List;
-/** Unit test for {@link SimpleLoadBalancerFactory}. */
+/** Unit test for {@link DummyLoadBalancerFactory}. */
@RunWith(JUnit4.class)
-public class SimpleLoadBalancerTest {
+public class DummyLoadBalancerTest {
private LoadBalancer<Transport> loadBalancer;
- private ArrayList<ResolvedServerInfo> servers;
+ private List<List<ResolvedServerInfo>> servers;
private EquivalentAddressGroup addressGroup;
@Mock private TransportManager<Transport> mockTransportManager;
@@ -73,13 +74,14 @@
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
- loadBalancer = SimpleLoadBalancerFactory.getInstance().newLoadBalancer(
+ loadBalancer = DummyLoadBalancerFactory.getInstance().newLoadBalancer(
"fakeservice", mockTransportManager);
- servers = new ArrayList<ResolvedServerInfo>();
+ servers = new ArrayList<List<ResolvedServerInfo>>();
+ servers.add(new ArrayList<ResolvedServerInfo>());
ArrayList<SocketAddress> addresses = new ArrayList<SocketAddress>();
for (int i = 0; i < 3; i++) {
SocketAddress addr = new FakeSocketAddress("server" + i);
- servers.add(new ResolvedServerInfo(addr, Attributes.EMPTY));
+ servers.get(0).add(new ResolvedServerInfo(addr, Attributes.EMPTY));
addresses.add(addr);
}
addressGroup = new EquivalentAddressGroup(addresses);
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
index 2073e8c..9245a11 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
@@ -62,6 +62,7 @@
import io.grpc.Compressor;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
+import io.grpc.DummyLoadBalancerFactory;
import io.grpc.IntegerMarshaller;
import io.grpc.LoadBalancer;
import io.grpc.ManagedChannel;
@@ -69,7 +70,6 @@
import io.grpc.MethodDescriptor;
import io.grpc.NameResolver;
import io.grpc.ResolvedServerInfo;
-import io.grpc.SimpleLoadBalancerFactory;
import io.grpc.Status;
import io.grpc.StringMarshaller;
import io.grpc.TransportManager;
@@ -120,7 +120,7 @@
private final SocketAddress socketAddress = new SocketAddress() {};
private final ResolvedServerInfo server = new ResolvedServerInfo(socketAddress, Attributes.EMPTY);
private SpyingLoadBalancerFactory loadBalancerFactory =
- new SpyingLoadBalancerFactory(SimpleLoadBalancerFactory.getInstance());
+ new SpyingLoadBalancerFactory(DummyLoadBalancerFactory.getInstance());
@Rule public final ExpectedException thrown = ExpectedException.none();
@@ -494,7 +494,7 @@
}
@Test
- public void nameResolverReturnsEmptyList() {
+ public void nameResolverReturnsEmptySubLists() {
String errorDescription = "NameResolver returned an empty list";
// Name resolution is started as soon as channel is created
@@ -527,7 +527,7 @@
assertEquals(1, loadBalancerFactory.balancers.size());
LoadBalancer<?> loadBalancer = loadBalancerFactory.balancers.get(0);
doThrow(ex).when(loadBalancer).handleResolvedAddresses(
- Matchers.<List<ResolvedServerInfo>>anyObject(), any(Attributes.class));
+ Matchers.<List<List<ResolvedServerInfo>>>anyObject(), any(Attributes.class));
// NameResolver returns addresses.
nameResolverFactory.allResolved();
@@ -806,7 +806,7 @@
}
void resolved() {
- listener.onUpdate(servers, Attributes.EMPTY);
+ listener.onUpdate(Collections.singletonList(servers), Attributes.EMPTY);
}
@Override public void shutdown() {
diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java
index 90c73a6..22f4a86 100644
--- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java
+++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java
@@ -150,14 +150,16 @@
@Override
public void handleResolvedAddresses(
- List<ResolvedServerInfo> updatedServers, Attributes config) {
+ List<? extends List<ResolvedServerInfo>> updatedServers, Attributes config) {
synchronized (lock) {
if (closed) {
return;
}
ArrayList<SocketAddress> addrs = new ArrayList<SocketAddress>(updatedServers.size());
- for (ResolvedServerInfo serverInfo : updatedServers) {
- addrs.add(serverInfo.getAddress());
+ for (List<ResolvedServerInfo> serverInfos : updatedServers) {
+ for (ResolvedServerInfo serverInfo : serverInfos) {
+ addrs.add(serverInfo.getAddress());
+ }
}
EquivalentAddressGroup newLbAddresses = new EquivalentAddressGroup(addrs);
if (!newLbAddresses.equals(lbAddresses)) {
diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
index d2f8878..c37ef1a 100644
--- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
+++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
@@ -430,7 +430,8 @@
lbAddressGroup = buildAddressGroup(lbServerInfo);
Transport lbTransport = new Transport();
when(mockTransportManager.getTransport(eq(lbAddressGroup))).thenReturn(lbTransport);
- loadBalancer.handleResolvedAddresses(Collections.singletonList(lbServerInfo), Attributes.EMPTY);
+ loadBalancer.handleResolvedAddresses(
+ Collections.singletonList(Collections.singletonList(lbServerInfo)), Attributes.EMPTY);
verify(mockTransportManager).getTransport(eq(lbAddressGroup));
return lbTransport;
}