core: Port PickFirst & RoundRobin LBs to v2 API (#2479)

diff --git a/core/src/main/java/io/grpc/PickFirstBalancerFactory2.java b/core/src/main/java/io/grpc/PickFirstBalancerFactory2.java
new file mode 100644
index 0000000..706ffdd
--- /dev/null
+++ b/core/src/main/java/io/grpc/PickFirstBalancerFactory2.java
@@ -0,0 +1,168 @@
+/*
+ * Copyright 2016, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *    * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *    * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ *    * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc;
+
+import static io.grpc.ConnectivityState.SHUTDOWN;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A {@link LoadBalancer} that provides no load balancing mechanism over the
+ * addresses from the {@link NameResolver}.  The channel's default behavior
+ * (currently pick-first) is used for all addresses found.
+ */
+@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771")
+public final class PickFirstBalancerFactory2 extends LoadBalancer2.Factory {
+
+  private static final PickFirstBalancerFactory2 INSTANCE = new PickFirstBalancerFactory2();
+
+  private PickFirstBalancerFactory2() {
+  }
+
+  public static PickFirstBalancerFactory2 getInstance() {
+    return INSTANCE;
+  }
+
+  @Override
+  public LoadBalancer2 newLoadBalancer(LoadBalancer2.Helper helper) {
+    return new PickFirstBalancer(helper);
+  }
+
+  @VisibleForTesting
+  static class PickFirstBalancer extends LoadBalancer2 {
+    private final Helper helper;
+    private Subchannel subchannel;
+
+    public PickFirstBalancer(Helper helper) {
+      this.helper = helper;
+    }
+
+    @Override
+    public void handleResolvedAddresses(List<ResolvedServerInfoGroup> servers,
+        Attributes attributes) {
+      // Flatten servers list received from name resolver into single address group. This means that
+      // as far as load balancer is concerned, there's virtually one single server with multiple
+      // addresses so the connection will be created only for the first address (pick first).
+      EquivalentAddressGroup newEag =
+          flattenResolvedServerInfoGroupsIntoEquivalentAddressGroup(servers);
+      if (subchannel == null || !newEag.equals(subchannel.getAddresses())) {
+        if (subchannel != null) {
+          subchannel.shutdown();
+        }
+
+        subchannel = helper.createSubchannel(newEag, Attributes.EMPTY);
+        helper.updatePicker(new Picker(PickResult.withSubchannel(subchannel)));
+      }
+    }
+
+    @Override
+    public void handleNameResolutionError(Status error) {
+      if (subchannel != null) {
+        subchannel.shutdown();
+        subchannel = null;
+      }
+      // NB(lukaszx0) Whether we should propagate the error unconditionally is arguable. It's fine
+      // for time being.
+      helper.updatePicker(new Picker(PickResult.withError(error)));
+    }
+
+    @Override
+    public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
+      ConnectivityState currentState = stateInfo.getState();
+      if (subchannel != this.subchannel || currentState == SHUTDOWN) {
+        return;
+      }
+
+      PickResult pickResult;
+      switch (currentState) {
+        case CONNECTING:
+          pickResult = PickResult.withNoResult();
+          break;
+        case READY:
+        case IDLE:
+          pickResult = PickResult.withSubchannel(subchannel);
+          break;
+        case TRANSIENT_FAILURE:
+          pickResult = PickResult.withError(stateInfo.getStatus());
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+
+      helper.updatePicker(new Picker(pickResult));
+    }
+
+    @Override
+    public void shutdown() {
+      if (subchannel != null) {
+        subchannel.shutdown();
+      }
+    }
+
+    /**
+     * Flattens list of ResolvedServerInfoGroup objects into one EquivalentAddressGroup object.
+     */
+    private static EquivalentAddressGroup flattenResolvedServerInfoGroupsIntoEquivalentAddressGroup(
+        List<ResolvedServerInfoGroup> groupList) {
+      List<SocketAddress> addrs = new ArrayList<SocketAddress>();
+      for (ResolvedServerInfoGroup group : groupList) {
+        for (ResolvedServerInfo srv : group.getResolvedServerInfoList()) {
+          addrs.add(srv.getAddress());
+        }
+      }
+      return new EquivalentAddressGroup(addrs);
+    }
+
+  }
+
+  /**
+   * No-op picker which doesn't add any custom picking logic. It just passes already known result
+   * received in constructor.
+   */
+  @VisibleForTesting
+  static class Picker extends LoadBalancer2.SubchannelPicker {
+    private final LoadBalancer2.PickResult result;
+
+    Picker(LoadBalancer2.PickResult result) {
+      this.result = result;
+    }
+
+    @Override
+    public LoadBalancer2.PickResult pickSubchannel(Attributes affinity, Metadata headers) {
+      return result;
+    }
+  }
+}
diff --git a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory2.java b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory2.java
new file mode 100644
index 0000000..5dd24e6
--- /dev/null
+++ b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory2.java
@@ -0,0 +1,283 @@
+/*
+ * Copyright 2016, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *    * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *    * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ *    * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc.util;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static io.grpc.ConnectivityState.IDLE;
+import static io.grpc.ConnectivityState.READY;
+import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import io.grpc.Attributes;
+import io.grpc.ConnectivityStateInfo;
+import io.grpc.EquivalentAddressGroup;
+import io.grpc.ExperimentalApi;
+import io.grpc.LoadBalancer;
+import io.grpc.LoadBalancer2;
+import io.grpc.LoadBalancer2.PickResult;
+import io.grpc.LoadBalancer2.Subchannel;
+import io.grpc.LoadBalancer2.SubchannelPicker;
+import io.grpc.Metadata;
+import io.grpc.NameResolver;
+import io.grpc.ResolvedServerInfo;
+import io.grpc.ResolvedServerInfoGroup;
+import io.grpc.Status;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link LoadBalancer} that provides round-robin load balancing mechanism over the
+ * addresses from the {@link NameResolver}.  The sub-lists received from the name resolver
+ * are considered to be an {@link EquivalentAddressGroup} and each of these sub-lists is
+ * what is then balanced across.
+ */
+@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771")
+public class RoundRobinLoadBalancerFactory2 extends LoadBalancer2.Factory {
+  private static final RoundRobinLoadBalancerFactory2 INSTANCE =
+      new RoundRobinLoadBalancerFactory2();
+
+  private RoundRobinLoadBalancerFactory2() {
+  }
+
+  public static RoundRobinLoadBalancerFactory2 getInstance() {
+    return INSTANCE;
+  }
+
+  @Override
+  public LoadBalancer2 newLoadBalancer(LoadBalancer2.Helper helper) {
+    return new RoundRobinLoadBalancer(helper);
+  }
+
+  @VisibleForTesting
+  static class RoundRobinLoadBalancer extends LoadBalancer2 {
+    private final Helper helper;
+    private final Map<EquivalentAddressGroup, Subchannel> subchannels =
+        new HashMap<EquivalentAddressGroup, Subchannel>();
+
+    @VisibleForTesting
+    static final Attributes.Key<AtomicReference<ConnectivityStateInfo>> STATE_INFO =
+        Attributes.Key.of("state-info");
+
+    public RoundRobinLoadBalancer(Helper helper) {
+      this.helper = helper;
+    }
+
+    @Override
+    public void handleResolvedAddresses(List<ResolvedServerInfoGroup> servers,
+        Attributes attributes) {
+      Set<EquivalentAddressGroup> currentAddrs = subchannels.keySet();
+      Set<EquivalentAddressGroup> latestAddrs =
+          resolvedServerInfoGroupToEquivalentAddressGroup(servers);
+      Set<EquivalentAddressGroup> addedAddrs = setsDifference(latestAddrs, currentAddrs);
+      Set<EquivalentAddressGroup> removedAddrs = setsDifference(currentAddrs, latestAddrs);
+
+      // NB(lukaszx0): we don't merge `attributes` with `subchannelAttr` because subchannel doesn't
+      // need them. They're describing the resolved server list but we're not taking any action
+      // based on this information.
+      Attributes subchannelAttrs = Attributes.newBuilder()
+          // NB(lukaszx0): because attributes are immutable we can't set new value for the key
+          // after creation but since we can mutate the values we leverge that and set
+          // AtomicReference which will allow mutating state info for given channel.
+          .set(STATE_INFO, new AtomicReference<ConnectivityStateInfo>(
+              ConnectivityStateInfo.forNonError(IDLE)))
+          .build();
+
+      // Create new subchannels for new addresses.
+      for (EquivalentAddressGroup addressGroup : addedAddrs) {
+        Subchannel subchannel = checkNotNull(helper.createSubchannel(addressGroup, subchannelAttrs),
+            "subchannel");
+        subchannels.put(addressGroup, subchannel);
+        subchannel.requestConnection();
+      }
+
+      // Shutdown subchannels for removed addresses.
+      for (EquivalentAddressGroup addressGroup : removedAddrs) {
+        Subchannel subchannel = subchannels.remove(addressGroup);
+        subchannel.shutdown();
+      }
+
+      updatePicker(getAggregatedError());
+    }
+
+    @Override
+    public void handleNameResolutionError(Status error) {
+      updatePicker(error);
+    }
+
+    @Override
+    public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
+      if (!subchannels.containsValue(subchannel)) {
+        return;
+      }
+      if (stateInfo.getState() == IDLE) {
+        subchannel.requestConnection();
+      }
+      getSubchannelStateInfoRef(subchannel).set(stateInfo);
+      updatePicker(getAggregatedError());
+    }
+
+    @Override
+    public void shutdown() {
+      for (Subchannel subchannel : getSubchannels()) {
+        subchannel.shutdown();
+      }
+    }
+
+    /**
+     * Updates picker with the list of active subchannels (state == READY).
+     */
+    private void updatePicker(@Nullable Status error) {
+      List<Subchannel> activeList = filterNonFailingSubchannels(getSubchannels());
+      helper.updatePicker(new Picker(activeList, error));
+    }
+
+    /**
+     * Filters out non-ready subchannels.
+     */
+    private static List<Subchannel> filterNonFailingSubchannels(
+        Collection<Subchannel> subchannels) {
+      List<Subchannel> readySubchannels = new ArrayList<Subchannel>();
+      for (Subchannel subchannel : subchannels) {
+        if (getSubchannelStateInfoRef(subchannel).get().getState() == READY) {
+          readySubchannels.add(subchannel);
+        }
+      }
+      return readySubchannels;
+    }
+
+    /**
+     * Converts list of {@link ResolvedServerInfoGroup} to {@link EquivalentAddressGroup} set.
+     */
+    private static Set<EquivalentAddressGroup> resolvedServerInfoGroupToEquivalentAddressGroup(
+        List<ResolvedServerInfoGroup> groupList) {
+      Set<EquivalentAddressGroup> addrs = new HashSet<EquivalentAddressGroup>();
+      for (ResolvedServerInfoGroup group : groupList) {
+        for (ResolvedServerInfo server : group.getResolvedServerInfoList()) {
+          addrs.add(new EquivalentAddressGroup(server.getAddress()));
+        }
+      }
+      return addrs;
+    }
+
+    /**
+     * If all subchannels are TRANSIENT_FAILURE, return the Status associated with an arbitrary
+     * subchannel otherwise, return null.
+     */
+    @Nullable
+    private Status getAggregatedError() {
+      Status status = null;
+      for (Subchannel subchannel : getSubchannels()) {
+        ConnectivityStateInfo stateInfo = getSubchannelStateInfoRef(subchannel).get();
+        if (stateInfo.getState() != TRANSIENT_FAILURE) {
+          return null;
+        } else {
+          status = stateInfo.getStatus();
+        }
+      }
+      return status;
+    }
+
+    @VisibleForTesting
+    Collection<Subchannel> getSubchannels() {
+      return subchannels.values();
+    }
+
+    private static AtomicReference<ConnectivityStateInfo> getSubchannelStateInfoRef(
+        Subchannel subchannel) {
+      return checkNotNull(subchannel.getAttributes().get(STATE_INFO), "STATE_INFO");
+    }
+
+    private static <T> Set<T> setsDifference(Set<T> a, Set<T> b) {
+      Set<T> aCopy = new HashSet<T>(a);
+      aCopy.removeAll(b);
+      return aCopy;
+    }
+  }
+
+  @VisibleForTesting
+  static class Picker extends SubchannelPicker {
+    @Nullable
+    final Status status;
+    private final List<Subchannel> list;
+    private int index;
+    private final boolean empty;
+
+    Picker(List<Subchannel> list, @Nullable Status status) {
+      this.empty = list.isEmpty();
+      this.list = list;
+      this.status = status;
+    }
+
+    @Override
+    public PickResult pickSubchannel(Attributes affinity, Metadata headers) {
+      if (!empty) {
+        return PickResult.withSubchannel(nextSubchannel());
+      } else {
+        if (status != null) {
+          return PickResult.withError(status);
+        }
+        return PickResult.withNoResult();
+      }
+    }
+
+    private Subchannel nextSubchannel() {
+      if (empty) {
+        throw new NoSuchElementException();
+      }
+      synchronized (this) {
+        Subchannel val = list.get(index);
+        index++;
+        if (index >= list.size()) {
+          index -= list.size();
+        }
+        return val;
+      }
+    }
+
+    @VisibleForTesting
+    List<Subchannel> getList() {
+      return Collections.unmodifiableList(list);
+    }
+  }
+}
diff --git a/core/src/test/java/io/grpc/PickFirstLoadBalancer2Test.java b/core/src/test/java/io/grpc/PickFirstLoadBalancer2Test.java
new file mode 100644
index 0000000..ef4401d
--- /dev/null
+++ b/core/src/test/java/io/grpc/PickFirstLoadBalancer2Test.java
@@ -0,0 +1,288 @@
+/*
+ * Copyright 2016, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *    * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *    * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ *    * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Lists;
+
+import io.grpc.LoadBalancer2.Helper;
+import io.grpc.LoadBalancer2.PickResult;
+import io.grpc.LoadBalancer2.Subchannel;
+import io.grpc.PickFirstBalancerFactory2.PickFirstBalancer;
+import io.grpc.PickFirstBalancerFactory2.Picker;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.net.SocketAddress;
+import java.util.List;
+
+
+/** Unit test for {@link PickFirstBalancerFactory2}. */
+@RunWith(JUnit4.class)
+public class PickFirstLoadBalancer2Test {
+  private PickFirstBalancer loadBalancer;
+  private List<ResolvedServerInfoGroup> servers = Lists.newArrayList();
+  private List<SocketAddress> socketAddresses = Lists.newArrayList();
+
+  private static Attributes.Key<String> FOO = Attributes.Key.of("foo");
+  private Attributes affinity = Attributes.newBuilder().set(FOO, "bar").build();
+
+  @Captor
+  private ArgumentCaptor<EquivalentAddressGroup> eagCaptor;
+  @Captor
+  private ArgumentCaptor<Picker> pickerCaptor;
+  @Captor
+  private ArgumentCaptor<Attributes> attrsCaptor;
+  @Mock
+  private Helper mockHelper;
+  @Mock
+  private Subchannel mockSubchannel;
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+    for (int i = 0; i < 3; i++) {
+      SocketAddress addr = new FakeSocketAddress("server" + i);
+      servers.add(ResolvedServerInfoGroup.builder().add(new ResolvedServerInfo(addr)).build());
+      socketAddresses.add(addr);
+    }
+
+    when(mockSubchannel.getAddresses()).thenReturn(new EquivalentAddressGroup(socketAddresses));
+    when(mockHelper.createSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class)))
+        .thenReturn(mockSubchannel);
+
+    loadBalancer = (PickFirstBalancer) PickFirstBalancerFactory2.getInstance().newLoadBalancer(
+        mockHelper);
+  }
+
+  @Test
+  public void pickAfterResolved() throws Exception {
+    loadBalancer.handleResolvedAddresses(servers, affinity);
+
+    verify(mockHelper).createSubchannel(eagCaptor.capture(), attrsCaptor.capture());
+    verify(mockHelper).updatePicker(pickerCaptor.capture());
+
+    assertEquals(new EquivalentAddressGroup(socketAddresses), eagCaptor.getValue());
+    assertEquals(pickerCaptor.getValue().pickSubchannel(affinity, new Metadata()),
+        pickerCaptor.getValue().pickSubchannel(affinity, new Metadata()));
+
+    verifyNoMoreInteractions(mockHelper);
+  }
+
+  @Test
+  public void pickAfterResolvedAndUnchanged() throws Exception {
+    loadBalancer.handleResolvedAddresses(servers, affinity);
+    loadBalancer.handleResolvedAddresses(servers, affinity);
+
+    verify(mockHelper).createSubchannel(any(EquivalentAddressGroup.class),
+        any(Attributes.class));
+    verify(mockHelper).updatePicker(isA(Picker.class));
+
+    verifyNoMoreInteractions(mockHelper);
+  }
+
+  @Test
+  public void pickAfterResolvedAndChanged() throws Exception {
+    SocketAddress socketAddr = new FakeSocketAddress("newserver");
+    List<SocketAddress> newSocketAddresses = Lists.newArrayList(socketAddr);
+    List<ResolvedServerInfoGroup> newServers = Lists.newArrayList(
+        ResolvedServerInfoGroup.builder().add(new ResolvedServerInfo(socketAddr)).build());
+
+    final Subchannel oldSubchannel = mock(Subchannel.class);
+    final EquivalentAddressGroup oldEag = new EquivalentAddressGroup(socketAddresses);
+    when(oldSubchannel.getAddresses()).thenReturn(oldEag);
+
+    final Subchannel newSubchannel = mock(Subchannel.class);
+    final EquivalentAddressGroup newEag = new EquivalentAddressGroup(newSocketAddresses);
+    when(newSubchannel.getAddresses()).thenReturn(newEag);
+
+    when(mockHelper.createSubchannel(eq(oldEag), any(Attributes.class))).thenReturn(oldSubchannel);
+    when(mockHelper.createSubchannel(eq(newEag), any(Attributes.class))).thenReturn(newSubchannel);
+
+    InOrder inOrder = inOrder(mockHelper);
+
+    loadBalancer.handleResolvedAddresses(servers, affinity);
+    inOrder.verify(mockHelper).createSubchannel(eagCaptor.capture(), any(Attributes.class));
+    inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture());
+    assertEquals(socketAddresses, eagCaptor.getValue().getAddresses());
+
+    loadBalancer.handleResolvedAddresses(newServers, affinity);
+    inOrder.verify(mockHelper).createSubchannel(eagCaptor.capture(), any(Attributes.class));
+    inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture());
+    assertEquals(newSocketAddresses, eagCaptor.getValue().getAddresses());
+
+    Subchannel subchannel = pickerCaptor.getAllValues().get(0).pickSubchannel(
+        affinity, new Metadata()).getSubchannel();
+    assertEquals(oldSubchannel, subchannel);
+
+    Subchannel subchannel2 = pickerCaptor.getAllValues().get(1).pickSubchannel(affinity,
+        new Metadata()).getSubchannel();
+    assertEquals(newSubchannel, subchannel2);
+    verify(subchannel2, never()).shutdown();
+
+    verifyNoMoreInteractions(mockHelper);
+  }
+
+  @Test
+  public void stateChangeBeforeResolution() throws Exception {
+    loadBalancer.handleSubchannelState(mockSubchannel,
+        ConnectivityStateInfo.forNonError(ConnectivityState.READY));
+
+    verifyNoMoreInteractions(mockHelper);
+  }
+
+  @Test
+  public void pickAfterStateChangeAfterResolution() throws Exception {
+    loadBalancer.handleResolvedAddresses(servers, affinity);
+    verify(mockHelper).updatePicker(pickerCaptor.capture());
+    Subchannel subchannel = pickerCaptor.getValue().pickSubchannel(affinity,
+        new Metadata()).getSubchannel();
+    reset(mockHelper);
+
+    InOrder inOrder = inOrder(mockHelper);
+
+    Status error = Status.UNAVAILABLE.withDescription("boom!");
+    loadBalancer.handleSubchannelState(subchannel,
+        ConnectivityStateInfo.forTransientFailure(error));
+    inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture());
+    assertEquals(error, pickerCaptor.getValue().pickSubchannel(Attributes.EMPTY,
+            new Metadata()).getStatus());
+
+    loadBalancer.handleSubchannelState(subchannel,
+        ConnectivityStateInfo.forNonError(ConnectivityState.IDLE));
+    inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture());
+    assertEquals(Status.OK, pickerCaptor.getValue().pickSubchannel(Attributes.EMPTY,
+        new Metadata()).getStatus());
+
+    loadBalancer.handleSubchannelState(subchannel,
+        ConnectivityStateInfo.forNonError(ConnectivityState.READY));
+    inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture());
+    assertEquals(subchannel,
+        pickerCaptor.getValue().pickSubchannel(Attributes.EMPTY, new Metadata()).getSubchannel());
+
+    verifyNoMoreInteractions(mockHelper);
+  }
+
+  @Test
+  public void nameResolutionError() throws Exception {
+    Status error = Status.NOT_FOUND.withDescription("nameResolutionError");
+    loadBalancer.handleNameResolutionError(error);
+    verify(mockHelper).updatePicker(pickerCaptor.capture());
+    PickResult pickResult = pickerCaptor.getValue().pickSubchannel(Attributes.EMPTY,
+        new Metadata());
+    assertEquals(null, pickResult.getSubchannel());
+    assertEquals(error, pickResult.getStatus());
+    verifyNoMoreInteractions(mockHelper);
+  }
+
+  @Test
+  public void nameResolutionSuccessAfterError() throws Exception {
+    InOrder inOrder = inOrder(mockHelper);
+
+    loadBalancer.handleNameResolutionError(Status.NOT_FOUND.withDescription("nameResolutionError"));
+    inOrder.verify(mockHelper).updatePicker(any(Picker.class));
+
+    loadBalancer.handleResolvedAddresses(servers, affinity);
+    inOrder.verify(mockHelper).createSubchannel(eq(new EquivalentAddressGroup(socketAddresses)),
+        eq(Attributes.EMPTY));
+    inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture());
+
+    assertEquals(mockSubchannel,
+        pickerCaptor.getValue().pickSubchannel(Attributes.EMPTY, new Metadata()).getSubchannel());
+
+    assertEquals(pickerCaptor.getValue().pickSubchannel(Attributes.EMPTY, new Metadata()),
+        pickerCaptor.getValue().pickSubchannel(Attributes.EMPTY, new Metadata()));
+
+    verifyNoMoreInteractions(mockHelper);
+  }
+
+  @Test
+  public void nameResolutionErrorWithStateChanges() throws Exception {
+    InOrder inOrder = inOrder(mockHelper);
+
+    loadBalancer.handleSubchannelState(mockSubchannel,
+        ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
+    Status error = Status.NOT_FOUND.withDescription("nameResolutionError");
+    loadBalancer.handleNameResolutionError(error);
+    inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture());
+
+    PickResult pickResult = pickerCaptor.getValue().pickSubchannel(Attributes.EMPTY,
+        new Metadata());
+    assertEquals(null, pickResult.getSubchannel());
+    assertEquals(error, pickResult.getStatus());
+
+    loadBalancer.handleSubchannelState(mockSubchannel,
+        ConnectivityStateInfo.forNonError(ConnectivityState.READY));
+    Status error2 = Status.NOT_FOUND.withDescription("nameResolutionError2");
+    loadBalancer.handleNameResolutionError(error2);
+    inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture());
+
+    pickResult = pickerCaptor.getValue().pickSubchannel(Attributes.EMPTY,
+        new Metadata());
+    assertEquals(null, pickResult.getSubchannel());
+    assertEquals(error2, pickResult.getStatus());
+
+    verifyNoMoreInteractions(mockHelper);
+  }
+
+  private static class FakeSocketAddress extends SocketAddress {
+    final String name;
+
+    FakeSocketAddress(String name) {
+      this.name = name;
+    }
+
+    @Override
+    public String toString() {
+      return "FakeSocketAddress-" + name;
+    }
+  }
+}
diff --git a/core/src/test/java/io/grpc/util/RoundRobinLoadBalancer2Test.java b/core/src/test/java/io/grpc/util/RoundRobinLoadBalancer2Test.java
new file mode 100644
index 0000000..fe12e1b
--- /dev/null
+++ b/core/src/test/java/io/grpc/util/RoundRobinLoadBalancer2Test.java
@@ -0,0 +1,380 @@
+/*
+ * Copyright 2016, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *    * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *    * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ *    * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc.util;
+
+import static com.google.common.truth.Truth.assertThat;
+import static io.grpc.ConnectivityState.IDLE;
+import static io.grpc.ConnectivityState.READY;
+import static io.grpc.util.RoundRobinLoadBalancerFactory2.RoundRobinLoadBalancer.STATE_INFO;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import io.grpc.Attributes;
+import io.grpc.ConnectivityState;
+import io.grpc.ConnectivityStateInfo;
+import io.grpc.EquivalentAddressGroup;
+import io.grpc.LoadBalancer2;
+import io.grpc.LoadBalancer2.Helper;
+import io.grpc.LoadBalancer2.Subchannel;
+import io.grpc.Metadata;
+import io.grpc.ResolvedServerInfo;
+import io.grpc.ResolvedServerInfoGroup;
+import io.grpc.Status;
+import io.grpc.util.RoundRobinLoadBalancerFactory2.Picker;
+import io.grpc.util.RoundRobinLoadBalancerFactory2.RoundRobinLoadBalancer;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+/** Unit test for {@link RoundRobinLoadBalancerFactory2}. */
+@RunWith(JUnit4.class)
+public class RoundRobinLoadBalancer2Test {
+  private RoundRobinLoadBalancer loadBalancer;
+  private Map<ResolvedServerInfoGroup, EquivalentAddressGroup> servers = Maps.newHashMap();
+  private Map<EquivalentAddressGroup, Subchannel> subchannels = Maps.newLinkedHashMap();
+  private static Attributes.Key<String> MAJOR_KEY = Attributes.Key.of("major-key");
+  private Attributes affinity = Attributes.newBuilder().set(MAJOR_KEY, "I got the keys").build();
+
+  @Captor
+  private ArgumentCaptor<Picker> pickerCaptor;
+  @Captor
+  private ArgumentCaptor<EquivalentAddressGroup> eagCaptor;
+  @Captor
+  private ArgumentCaptor<Attributes> attrsCaptor;
+  @Mock
+  private Helper mockHelper;
+  @Mock
+  private Subchannel mockSubchannel;
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+
+    for (int i = 0; i < 3; i++) {
+      SocketAddress addr = new FakeSocketAddress("server" + i);
+      EquivalentAddressGroup eag = new EquivalentAddressGroup(addr);
+      servers.put(ResolvedServerInfoGroup.builder().add(new ResolvedServerInfo(addr)).build(), eag);
+      subchannels.put(eag, createMockSubchannel());
+    }
+
+    when(mockHelper.createSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class)))
+        .then(new Answer<Subchannel>() {
+          @Override
+          public Subchannel answer(InvocationOnMock invocation) throws Throwable {
+            Object[] args = invocation.getArguments();
+            return subchannels.get(args[0]);
+          }
+        });
+
+    loadBalancer = (RoundRobinLoadBalancer) RoundRobinLoadBalancerFactory2.getInstance()
+        .newLoadBalancer(mockHelper);
+  }
+
+  @Test
+  public void pickAfterResolved() throws Exception {
+    Subchannel readySubchannel = subchannels.get(servers.get(servers.keySet().iterator().next()));
+    when(readySubchannel.getAttributes()).thenReturn(Attributes.newBuilder()
+        .set(STATE_INFO, new AtomicReference<ConnectivityStateInfo>(
+            ConnectivityStateInfo.forNonError(READY)))
+        .build());
+    loadBalancer.handleResolvedAddresses(Lists.newArrayList(servers.keySet()), affinity);
+
+    verify(mockHelper, times(3)).createSubchannel(eagCaptor.capture(),
+        any(Attributes.class));
+
+    assertThat(eagCaptor.getAllValues()).containsAllIn(subchannels.keySet());
+    for (Subchannel subchannel : subchannels.values()) {
+      verify(subchannel).requestConnection();
+      verify(subchannel, never()).shutdown();
+    }
+
+    verify(mockHelper, times(1)).updatePicker(pickerCaptor.capture());
+
+    assertThat(pickerCaptor.getValue().getList()).containsExactly(readySubchannel);
+
+    verifyNoMoreInteractions(mockHelper);
+  }
+
+  @Test
+  public void pickAfterResolvedUpdatedHosts() throws Exception {
+    Subchannel removedSubchannel = mock(Subchannel.class);
+    Subchannel oldSubchannel = mock(Subchannel.class);
+    Subchannel newSubchannel = mock(Subchannel.class);
+
+    for (Subchannel subchannel : Lists.newArrayList(removedSubchannel, oldSubchannel,
+        newSubchannel)) {
+      when(subchannel.getAttributes()).thenReturn(Attributes.newBuilder().set(STATE_INFO,
+          new AtomicReference<ConnectivityStateInfo>(
+              ConnectivityStateInfo.forNonError(READY))).build());
+    }
+
+    FakeSocketAddress removedAddr = new FakeSocketAddress("removed");
+    FakeSocketAddress oldAddr = new FakeSocketAddress("old");
+    FakeSocketAddress newAddr = new FakeSocketAddress("new");
+
+    final Map<EquivalentAddressGroup, Subchannel> subchannels2 = Maps.newHashMap();
+    subchannels2.put(new EquivalentAddressGroup(removedAddr), removedSubchannel);
+    subchannels2.put(new EquivalentAddressGroup(oldAddr), oldSubchannel);
+
+    List<ResolvedServerInfoGroup> currentServers = Lists.newArrayList(
+        ResolvedServerInfoGroup.builder()
+            .add(new ResolvedServerInfo(removedAddr))
+            .add(new ResolvedServerInfo(oldAddr))
+            .build());
+
+    when(mockHelper.createSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class)))
+        .then(new Answer<Subchannel>() {
+          @Override
+          public Subchannel answer(InvocationOnMock invocation) throws Throwable {
+            Object[] args = invocation.getArguments();
+            return subchannels2.get(args[0]);
+          }
+        });
+
+    loadBalancer.handleResolvedAddresses(currentServers, affinity);
+
+    InOrder inOrder = inOrder(mockHelper);
+
+    inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture());
+    Picker picker = pickerCaptor.getValue();
+    assertNull(picker.status);
+    assertThat(picker.getList()).containsExactly(removedSubchannel, oldSubchannel);
+
+    verify(removedSubchannel, times(1)).requestConnection();
+    verify(oldSubchannel, times(1)).requestConnection();
+
+    assertThat(loadBalancer.getSubchannels()).containsExactly(removedSubchannel,
+        oldSubchannel);
+
+    subchannels2.clear();
+    subchannels2.put(new EquivalentAddressGroup(oldAddr), oldSubchannel);
+    subchannels2.put(new EquivalentAddressGroup(newAddr), newSubchannel);
+
+    List<ResolvedServerInfoGroup> latestServers = Lists.newArrayList(
+        ResolvedServerInfoGroup.builder()
+            .add(new ResolvedServerInfo(oldAddr))
+            .add(new ResolvedServerInfo(newAddr))
+            .build());
+
+    loadBalancer.handleResolvedAddresses(latestServers, affinity);
+
+    verify(newSubchannel, times(1)).requestConnection();
+    verify(removedSubchannel, times(1)).shutdown();
+
+    assertThat(loadBalancer.getSubchannels()).containsExactly(oldSubchannel,
+        newSubchannel);
+
+    verify(mockHelper, times(3)).createSubchannel(any(EquivalentAddressGroup.class),
+        any(Attributes.class));
+    inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture());
+
+    picker = pickerCaptor.getValue();
+    assertNull(picker.status);
+    assertThat(picker.getList()).containsExactly(oldSubchannel, newSubchannel);
+
+    verifyNoMoreInteractions(mockHelper);
+  }
+
+  @Test
+  public void pickAfterStateChangeBeforeResolution() throws Exception {
+    loadBalancer.handleSubchannelState(mockSubchannel,
+        ConnectivityStateInfo.forNonError(ConnectivityState.READY));
+    verifyNoMoreInteractions(mockSubchannel);
+    verifyNoMoreInteractions(mockHelper);
+  }
+
+  @Test
+  public void pickAfterStateChangeAndResolutionError() throws Exception {
+    loadBalancer.handleSubchannelState(mockSubchannel,
+        ConnectivityStateInfo.forNonError(ConnectivityState.READY));
+    verifyNoMoreInteractions(mockSubchannel);
+    verifyNoMoreInteractions(mockHelper);
+  }
+
+  @Test
+  public void pickAfterStateChange() throws Exception {
+    InOrder inOrder = inOrder(mockHelper);
+    when(mockHelper.createSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class)))
+        .then(new Answer<Subchannel>() {
+          @Override
+          public Subchannel answer(InvocationOnMock invocation) throws Throwable {
+            return createMockSubchannel();
+          }
+        });
+    loadBalancer.handleResolvedAddresses(Lists.newArrayList(servers.keySet()), Attributes.EMPTY);
+    Subchannel subchannel = loadBalancer.getSubchannels().iterator().next();
+    AtomicReference<ConnectivityStateInfo> subchannelStateInfo = subchannel.getAttributes().get(
+        STATE_INFO);
+
+    inOrder.verify(mockHelper).updatePicker(isA(Picker.class));
+    assertThat(subchannelStateInfo.get()).isEqualTo(ConnectivityStateInfo.forNonError(IDLE));
+
+    loadBalancer.handleSubchannelState(subchannel,
+        ConnectivityStateInfo.forNonError(ConnectivityState.READY));
+    inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture());
+    assertNull(pickerCaptor.getValue().status);
+    assertThat(subchannelStateInfo.get()).isEqualTo(
+        ConnectivityStateInfo.forNonError(ConnectivityState.READY));
+
+    Status error = Status.UNKNOWN.withDescription("¯\\_(ツ)_//¯");
+    loadBalancer.handleSubchannelState(subchannel,
+        ConnectivityStateInfo.forTransientFailure(error));
+    assertThat(subchannelStateInfo.get()).isEqualTo(
+        ConnectivityStateInfo.forTransientFailure(error));
+    inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture());
+    assertNull(pickerCaptor.getValue().status);
+
+    loadBalancer.handleSubchannelState(subchannel,
+        ConnectivityStateInfo.forNonError(ConnectivityState.IDLE));
+    inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture());
+    assertNull(pickerCaptor.getValue().status);
+    assertThat(subchannelStateInfo.get()).isEqualTo(
+        ConnectivityStateInfo.forNonError(ConnectivityState.IDLE));
+
+    verify(subchannel, times(2)).requestConnection();
+    verify(mockHelper, times(3)).createSubchannel(any(EquivalentAddressGroup.class),
+        any(Attributes.class));
+    verifyNoMoreInteractions(mockHelper);
+  }
+
+  @Test
+  public void pickerRoundRobin() throws Exception {
+    Subchannel subchannel = mock(Subchannel.class);
+    Subchannel subchannel1 = mock(Subchannel.class);
+    Subchannel subchannel2 = mock(Subchannel.class);
+
+    Picker picker = new Picker(Collections.unmodifiableList(
+        Lists.<Subchannel>newArrayList(subchannel, subchannel1, subchannel2)), null);
+
+    assertThat(picker.getList()).containsExactly(subchannel, subchannel1, subchannel2);
+
+    assertEquals(subchannel,
+        picker.pickSubchannel(Attributes.EMPTY, new Metadata()).getSubchannel());
+    assertEquals(subchannel1,
+        picker.pickSubchannel(Attributes.EMPTY, new Metadata()).getSubchannel());
+    assertEquals(subchannel2,
+        picker.pickSubchannel(Attributes.EMPTY, new Metadata()).getSubchannel());
+    assertEquals(subchannel,
+        picker.pickSubchannel(Attributes.EMPTY, new Metadata()).getSubchannel());
+  }
+
+  @Test
+  public void pickerEmptyList() throws Exception {
+    Picker picker = new Picker(Lists.<Subchannel>newArrayList(), Status.UNKNOWN);
+
+    assertEquals(null, picker.pickSubchannel(Attributes.EMPTY, new Metadata()).getSubchannel());
+    assertEquals(Status.UNKNOWN,
+        picker.pickSubchannel(Attributes.EMPTY, new Metadata()).getStatus());
+  }
+
+  @Test
+  public void nameResolutionErrorWithNoChannels() throws Exception {
+    Status error = Status.NOT_FOUND.withDescription("nameResolutionError");
+    loadBalancer.handleNameResolutionError(error);
+    verify(mockHelper).updatePicker(pickerCaptor.capture());
+    LoadBalancer2.PickResult pickResult = pickerCaptor.getValue().pickSubchannel(Attributes.EMPTY,
+        new Metadata());
+    assertNull(pickResult.getSubchannel());
+    assertEquals(error, pickResult.getStatus());
+    verifyNoMoreInteractions(mockHelper);
+  }
+
+  @Test
+  public void nameResolutionErrorWithActiveChannels() throws Exception {
+    Subchannel readySubchannel = subchannels.values().iterator().next();
+    readySubchannel.getAttributes().get(STATE_INFO).set(ConnectivityStateInfo.forNonError(READY));
+    loadBalancer.handleResolvedAddresses(Lists.newArrayList(servers.keySet()), affinity);
+    loadBalancer.handleNameResolutionError(Status.NOT_FOUND.withDescription("nameResolutionError"));
+
+    verify(mockHelper, times(3)).createSubchannel(any(EquivalentAddressGroup.class),
+        any(Attributes.class));
+    verify(mockHelper, times(2)).updatePicker(pickerCaptor.capture());
+
+    LoadBalancer2.PickResult pickResult = pickerCaptor.getValue().pickSubchannel(Attributes.EMPTY,
+        new Metadata());
+    assertEquals(readySubchannel, pickResult.getSubchannel());
+    assertEquals(Status.OK.getCode(), pickResult.getStatus().getCode());
+
+    LoadBalancer2.PickResult pickResult2 = pickerCaptor.getValue().pickSubchannel(Attributes.EMPTY,
+        new Metadata());
+    assertEquals(readySubchannel, pickResult2.getSubchannel());
+    verifyNoMoreInteractions(mockHelper);
+  }
+
+  private Subchannel createMockSubchannel() {
+    Subchannel subchannel = mock(Subchannel.class);
+    when(subchannel.getAttributes()).thenReturn(Attributes.newBuilder().set(STATE_INFO,
+        new AtomicReference<ConnectivityStateInfo>(
+            ConnectivityStateInfo.forNonError(IDLE))).build());
+    return subchannel;
+  }
+
+  private static class FakeSocketAddress extends SocketAddress {
+    final String name;
+
+    FakeSocketAddress(String name) {
+      this.name = name;
+    }
+
+    @Override
+    public String toString() {
+      return "FakeSocketAddress-" + name;
+    }
+  }
+}