Merge "Updating Routing and subscription state to support subscription to layer from specific publisher"
diff --git a/car-lib/src/android/car/vms/VmsSubscriptionState.java b/car-lib/src/android/car/vms/VmsSubscriptionState.java
index 0e36fb1..0e7e4fa 100644
--- a/car-lib/src/android/car/vms/VmsSubscriptionState.java
+++ b/car-lib/src/android/car/vms/VmsSubscriptionState.java
@@ -19,9 +19,13 @@
 import android.car.annotation.FutureFeature;
 import android.os.Parcel;
 import android.os.Parcelable;
+
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+
 
 /**
  * The list of layers with subscribers.
@@ -31,30 +35,44 @@
 @FutureFeature
 public final class VmsSubscriptionState implements Parcelable {
     private final int mSequenceNumber;
-    private final List<VmsLayer> mLayers;
+    private final Set<VmsLayer> mLayers;
+    private final Set<VmsAssociatedLayer> mSubscribedLayersFromPublishers;
 
     /**
-     * Construct a dependency for layer on other layers.
+     * Construcs a summary of the state of the current subscriptions for publishers to consume
+     * and adjust which layers that the are publishing.
      */
-    public VmsSubscriptionState(int sequenceNumber, List<VmsLayer> dependencies) {
+    public VmsSubscriptionState(int sequenceNumber,
+                                Set<VmsLayer> subscribedLayers,
+                                Set<VmsAssociatedLayer> layersFromPublishers) {
         mSequenceNumber = sequenceNumber;
-        mLayers = Collections.unmodifiableList(dependencies);
+        mLayers = Collections.unmodifiableSet(subscribedLayers);
+        mSubscribedLayersFromPublishers = Collections.unmodifiableSet(layersFromPublishers);
     }
 
     public int getSequenceNumber() {
         return mSequenceNumber;
     }
 
-    public List<VmsLayer> getLayers() {
+    public Set<VmsLayer> getSubscribedLayersFromAll() {
         return mLayers;
     }
 
+    public Set<VmsAssociatedLayer> getSubscribedLayersFromPublishers() {
+        return mSubscribedLayersFromPublishers;
+    }
+
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append("sequence number=").append(mSequenceNumber);
         sb.append("; layers={");
-        for(VmsLayer layer : mLayers) {
+        for (VmsLayer layer : mLayers) {
+            sb.append(layer).append(",");
+        }
+        sb.append("}");
+        sb.append("; associatedLayers={");
+        for (VmsAssociatedLayer layer : mSubscribedLayersFromPublishers) {
             sb.append(layer).append(",");
         }
         sb.append("}");
@@ -62,19 +80,21 @@
     }
 
     public static final Parcelable.Creator<VmsSubscriptionState> CREATOR = new
-        Parcelable.Creator<VmsSubscriptionState>() {
-            public VmsSubscriptionState createFromParcel(Parcel in) {
-                return new VmsSubscriptionState(in);
-            }
-            public VmsSubscriptionState[] newArray(int size) {
-                return new VmsSubscriptionState[size];
-            }
-        };
+            Parcelable.Creator<VmsSubscriptionState>() {
+                public VmsSubscriptionState createFromParcel(Parcel in) {
+                    return new VmsSubscriptionState(in);
+                }
+
+                public VmsSubscriptionState[] newArray(int size) {
+                    return new VmsSubscriptionState[size];
+                }
+            };
 
     @Override
     public void writeToParcel(Parcel out, int flags) {
         out.writeInt(mSequenceNumber);
-        out.writeParcelableList(mLayers, flags);
+        out.writeParcelableList(new ArrayList(mLayers), flags);
+        out.writeParcelableList(new ArrayList(mSubscribedLayersFromPublishers), flags);
     }
 
     @Override
@@ -84,8 +104,13 @@
 
     private VmsSubscriptionState(Parcel in) {
         mSequenceNumber = in.readInt();
+
         List<VmsLayer> layers = new ArrayList<>();
         in.readParcelableList(layers, VmsLayer.class.getClassLoader());
-        mLayers = Collections.unmodifiableList(layers);
+        mLayers = Collections.unmodifiableSet(new HashSet(layers));
+
+        List<VmsAssociatedLayer> associatedLayers = new ArrayList<>();
+        in.readParcelableList(associatedLayers, VmsAssociatedLayer.class.getClassLoader());
+        mSubscribedLayersFromPublishers = Collections.unmodifiableSet(new HashSet(associatedLayers));
     }
 }
\ No newline at end of file
diff --git a/service/src/com/android/car/VmsRouting.java b/service/src/com/android/car/VmsRouting.java
index 1cc56f1..6f5cbe4 100644
--- a/service/src/com/android/car/VmsRouting.java
+++ b/service/src/com/android/car/VmsRouting.java
@@ -18,6 +18,7 @@
 
 import android.car.annotation.FutureFeature;
 import android.car.vms.IVmsSubscriberClient;
+import android.car.vms.VmsAssociatedLayer;
 import android.car.vms.VmsLayer;
 import android.car.vms.VmsOperationRecorder;
 import android.car.vms.VmsSubscriptionState;
@@ -30,6 +31,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Manages all the VMS subscriptions:
@@ -43,19 +45,27 @@
     // A map of Layer + Version to listeners.
     @GuardedBy("mLock")
     private Map<VmsLayer, Set<IVmsSubscriberClient>> mLayerSubscriptions = new HashMap<>();
+
+    @GuardedBy("mLock")
+    private Map<VmsLayer, Map<Integer, Set<IVmsSubscriberClient>>> mLayerSubscriptionsToPublishers =
+            new HashMap<>();
     // A set of listeners that are interested in any layer + version.
     @GuardedBy("mLock")
     private Set<IVmsSubscriberClient> mPromiscuousSubscribers = new HashSet<>();
+
     // A set of all the layers + versions the HAL is subscribed to.
     @GuardedBy("mLock")
     private Set<VmsLayer> mHalSubscriptions = new HashSet<>();
+
+    @GuardedBy("mLock")
+    private Map<VmsLayer, Set<Integer>> mHalSubscriptionsToPublishers = new HashMap<>();
     // A sequence number that is increased every time the subscription state is modified. Note that
     // modifying the list of promiscuous subscribers does not affect the subscription state.
     @GuardedBy("mLock")
     private int mSequenceNumber = 0;
 
     /**
-     * Add a listener subscription to a data messages from layer + version.
+     * Add a listener subscription to data messages from a VMS layer.
      *
      * @param listener a VMS subscriber.
      * @param layer    the layer subscribing to.
@@ -91,6 +101,38 @@
     }
 
     /**
+     * Add a listener subscription to data messages from a VMS layer from a specific publisher.
+     *
+     * @param listener    a VMS subscriber.
+     * @param layer       the layer to subscribing to.
+     * @param publisherId the publisher ID.
+     */
+    public void addSubscription(IVmsSubscriberClient listener, VmsLayer layer, int publisherId) {
+        synchronized (mLock) {
+            ++mSequenceNumber;
+
+            Map<Integer, Set<IVmsSubscriberClient>> publisherIdsToListenersForLayer =
+                    mLayerSubscriptionsToPublishers.get(layer);
+
+            if (publisherIdsToListenersForLayer == null) {
+                publisherIdsToListenersForLayer = new HashMap<>();
+                mLayerSubscriptionsToPublishers.put(layer, publisherIdsToListenersForLayer);
+            }
+
+            Set<IVmsSubscriberClient> listenersForPublisher =
+                    publisherIdsToListenersForLayer.get(publisherId);
+
+            if (listenersForPublisher == null) {
+                listenersForPublisher = new HashSet<>();
+                publisherIdsToListenersForLayer.put(publisherId, listenersForPublisher);
+            }
+
+            // Add the listener to the list.
+            listenersForPublisher.add(listener);
+        }
+    }
+
+    /**
      * Remove a subscription for a layer + version and make sure to remove the key if there are no
      * more subscribers.
      *
@@ -130,6 +172,41 @@
     }
 
     /**
+     * Remove a subscription to data messages from a VMS layer from a specific publisher.
+     *
+     * @param listener    a VMS subscriber.
+     * @param layer       the layer to unsubscribing from.
+     * @param publisherId the publisher ID.
+     */
+    public void removeSubscription(IVmsSubscriberClient listener, VmsLayer layer, int publisherId) {
+        synchronized (mLock) {
+            ++mSequenceNumber;
+
+            Map<Integer, Set<IVmsSubscriberClient>> listenersToPublishers =
+                    mLayerSubscriptionsToPublishers.get(layer);
+
+            if (listenersToPublishers == null) {
+                return;
+            }
+
+            Set<IVmsSubscriberClient> listeners = listenersToPublishers.get(publisherId);
+
+            if (listeners == null) {
+                return;
+            }
+            listeners.remove(listener);
+
+            if (listeners.isEmpty()) {
+                listenersToPublishers.remove(publisherId);
+            }
+
+            if (listenersToPublishers.isEmpty()) {
+                mLayerSubscriptionsToPublishers.remove(layer);
+            }
+        }
+    }
+
+    /**
      * Remove a subscriber from all routes (optional operation).
      *
      * @param listener a VMS subscriber.
@@ -212,6 +289,19 @@
         }
     }
 
+    public void addHalSubscriptionToPublisher(VmsLayer layer, int publisherId) {
+        synchronized (mLock) {
+            ++mSequenceNumber;
+
+            Set<Integer> publisherIdsForLayer = mHalSubscriptionsToPublishers.get(layer);
+            if (publisherIdsForLayer == null) {
+                publisherIdsForLayer = new HashSet<>();
+                mHalSubscriptionsToPublishers.put(layer, publisherIdsForLayer);
+            }
+            publisherIdsForLayer.add(publisherId);
+        }
+    }
+
     /**
      * remove a layer and version to the HAL subscriptions.
      *
@@ -225,9 +315,26 @@
         }
     }
 
+    public void removeHalSubscriptionToPublisher(VmsLayer layer, int publisherId) {
+        synchronized (mLock) {
+            ++mSequenceNumber;
+
+            Set<Integer> publisherIdsForLayer = mHalSubscriptionsToPublishers.get(layer);
+            if (publisherIdsForLayer == null) {
+                return;
+            }
+            publisherIdsForLayer.remove(publisherId);
+
+            if (publisherIdsForLayer.isEmpty()) {
+                mHalSubscriptionsToPublishers.remove(layer);
+            }
+        }
+    }
+
     /**
      * checks if the HAL is subscribed to a layer.
      *
+     * @param layer
      * @return true if the HAL is subscribed to layer.
      */
     public boolean isHalSubscribed(VmsLayer layer) {
@@ -239,6 +346,7 @@
     /**
      * checks if there are subscribers to a layer.
      *
+     * @param layer
      * @return true if there are subscribers to layer.
      */
     public boolean hasLayerSubscriptions(VmsLayer layer) {
@@ -248,14 +356,46 @@
     }
 
     /**
+     * returns true if there is already a subscription for the layer from publisherId.
+     *
+     * @param layer
+     * @param publisherId
+     * @return
+     */
+    public boolean hasLayerFromPublisherSubscriptions(VmsLayer layer, int publisherId) {
+        synchronized (mLock) {
+            boolean hasClientSubscription =
+                    mLayerSubscriptionsToPublishers.containsKey(layer) &&
+                            mLayerSubscriptionsToPublishers.get(layer).containsKey(publisherId);
+
+            boolean hasHalSubscription = mHalSubscriptionsToPublishers.containsKey(layer) &&
+                    mHalSubscriptionsToPublishers.get(layer).contains(publisherId);
+
+            return hasClientSubscription || hasHalSubscription;
+        }
+    }
+
+    /**
      * @return a Set of layers and versions which VMS clients are subscribed to.
      */
     public VmsSubscriptionState getSubscriptionState() {
         synchronized (mLock) {
-            List<VmsLayer> layers = new ArrayList<>();
+            Set<VmsLayer> layers = new HashSet<>();
             layers.addAll(mLayerSubscriptions.keySet());
             layers.addAll(mHalSubscriptions);
-            return new VmsSubscriptionState(mSequenceNumber, layers);
+
+
+            Set<VmsAssociatedLayer> layersFromPublishers = new HashSet<>();
+            layersFromPublishers.addAll(mLayerSubscriptionsToPublishers.entrySet()
+                    .stream()
+                    .map(e -> new VmsAssociatedLayer(e.getKey(), e.getValue().keySet()))
+                    .collect(Collectors.toSet()));
+            layersFromPublishers.addAll(mHalSubscriptionsToPublishers.entrySet()
+                    .stream()
+                    .map(e -> new VmsAssociatedLayer(e.getKey(), e.getValue()))
+                    .collect(Collectors.toSet()));
+
+            return new VmsSubscriptionState(mSequenceNumber, layers, layersFromPublishers);
         }
     }
 }
\ No newline at end of file
diff --git a/service/src/com/android/car/hal/VmsHalService.java b/service/src/com/android/car/hal/VmsHalService.java
index 2e42748..bb33cb6 100644
--- a/service/src/com/android/car/hal/VmsHalService.java
+++ b/service/src/com/android/car/hal/VmsHalService.java
@@ -563,7 +563,7 @@
                 toTypedVmsVehiclePropValue(VmsMessageType.SUBSCRIPTIONS_RESPONSE);
         VehiclePropValue.RawValue v = vehicleProp.value;
         v.int32Values.add(subscription.getSequenceNumber());
-        List<VmsLayer> layers = subscription.getLayers();
+        Set<VmsLayer> layers = subscription.getSubscribedLayersFromAll();
         v.int32Values.add(layers.size());
         for (VmsLayer layer : layers) {
             v.int32Values.add(layer.getId());
diff --git a/tests/VmsPublisherClientSample/src/com/google/android/car/vms/publisher/VmsPublisherClientSampleService.java b/tests/VmsPublisherClientSample/src/com/google/android/car/vms/publisher/VmsPublisherClientSampleService.java
index e6393b3..46d08a0 100644
--- a/tests/VmsPublisherClientSample/src/com/google/android/car/vms/publisher/VmsPublisherClientSampleService.java
+++ b/tests/VmsPublisherClientSample/src/com/google/android/car/vms/publisher/VmsPublisherClientSampleService.java
@@ -57,7 +57,7 @@
     @Override
     public void onVmsSubscriptionChange(VmsSubscriptionState subscriptionState) {
         if (mInitialized.compareAndSet(false, true)) {
-            for (VmsLayer layer : subscriptionState.getLayers()) {
+            for (VmsLayer layer : subscriptionState.getSubscribedLayersFromAll()) {
                 if (layer.equals(TEST_LAYER)) {
                     mHandler.sendEmptyMessage(PUBLISH_EVENT);
                 }
diff --git a/tests/carservice_test/src/com/android/car/test/VmsPublisherClientMockService.java b/tests/carservice_test/src/com/android/car/test/VmsPublisherClientMockService.java
index 62c8782..aa90d6a 100644
--- a/tests/carservice_test/src/com/android/car/test/VmsPublisherClientMockService.java
+++ b/tests/carservice_test/src/com/android/car/test/VmsPublisherClientMockService.java
@@ -59,7 +59,7 @@
     }
 
     private void publishIfNeeded(VmsSubscriptionState subscriptionState) {
-        for (VmsLayer layer : subscriptionState.getLayers()) {
+        for (VmsLayer layer : subscriptionState.getSubscribedLayersFromAll()) {
             if (layer.equals(VmsPublisherSubscriberTest.LAYER)) {
                 publish(VmsPublisherSubscriberTest.LAYER, VmsPublisherSubscriberTest.PAYLOAD);
             }
@@ -68,7 +68,7 @@
 
     private void declareOffering(VmsSubscriptionState subscriptionState, int publisherId) {
         List<VmsLayerDependency> dependencies = new ArrayList<>();
-        for( VmsLayer layer : subscriptionState.getLayers()) {
+        for( VmsLayer layer : subscriptionState.getSubscribedLayersFromAll()) {
             dependencies.add(new VmsLayerDependency(layer));
         }
         VmsLayersOffering offering = new VmsLayersOffering(dependencies, publisherId);
diff --git a/tests/carservice_unit_test/src/com/android/car/VmsRoutingTest.java b/tests/carservice_unit_test/src/com/android/car/VmsRoutingTest.java
index f7725b8..0bfc595 100644
--- a/tests/carservice_unit_test/src/com/android/car/VmsRoutingTest.java
+++ b/tests/carservice_unit_test/src/com/android/car/VmsRoutingTest.java
@@ -23,15 +23,21 @@
 import android.test.AndroidTestCase;
 import android.test.suitebuilder.annotation.SmallTest;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.Map;
 
 @SmallTest
 public class VmsRoutingTest extends AndroidTestCase {
-    private static VmsLayer LAYER_WITH_SUBSCRIPTION_1= new VmsLayer(1, 2, 1);
-    private static VmsLayer LAYER_WITH_SUBSCRIPTION_2= new VmsLayer(1, 3, 3);
-    private static VmsLayer LAYER_WITHOUT_SUBSCRIPTION= new VmsLayer(1, 4, 7);
+    private static VmsLayer LAYER_WITH_SUBSCRIPTION_1 = new VmsLayer(1, 2, 1);
+    private static VmsLayer LAYER_WITH_SUBSCRIPTION_2 = new VmsLayer(1, 3, 3);
+    private static VmsLayer LAYER_WITHOUT_SUBSCRIPTION = new VmsLayer(1, 4, 7);
+    private static int PUBLISHER_ID_1 = 123;
+    private static int PUBLISHER_ID_2 = 456;
     private VmsRouting mRouting;
 
     @Override
@@ -54,11 +60,12 @@
         expectedSubscriptions.add(LAYER_WITH_SUBSCRIPTION_2);
         VmsSubscriptionState subscriptionState = mRouting.getSubscriptionState();
         assertEquals(2, subscriptionState.getSequenceNumber());
-        assertEquals(expectedSubscriptions, new HashSet<>(subscriptionState.getLayers()));
+        assertEquals(expectedSubscriptions,
+                new HashSet<>(subscriptionState.getSubscribedLayersFromAll()));
 
         // Verify there is only a single listener.
         assertEquals(1,
-            mRouting.getListeners(LAYER_WITH_SUBSCRIPTION_1).size());
+                mRouting.getListeners(LAYER_WITH_SUBSCRIPTION_1).size());
     }
 
     public void testAddingSubscribersAndHalLayersWithOverlap() throws Exception {
@@ -76,7 +83,8 @@
         expectedSubscriptions.add(LAYER_WITH_SUBSCRIPTION_2);
         VmsSubscriptionState subscriptionState = mRouting.getSubscriptionState();
         assertEquals(3, subscriptionState.getSequenceNumber());
-        assertEquals(expectedSubscriptions, new HashSet<>(subscriptionState.getLayers()));
+        assertEquals(expectedSubscriptions,
+                new HashSet<>(subscriptionState.getSubscribedLayersFromAll()));
     }
 
     public void testAddingAndRemovingLayers() throws Exception {
@@ -96,7 +104,7 @@
         // Verify there are no subscribers in the routing manager.
         VmsSubscriptionState subscriptionState = mRouting.getSubscriptionState();
         assertEquals(4, subscriptionState.getSequenceNumber());
-        assertTrue(subscriptionState.getLayers().isEmpty());
+        assertTrue(subscriptionState.getSubscribedLayersFromAll().isEmpty());
     }
 
     public void testAddingBothTypesOfSubscribers() throws Exception {
@@ -106,18 +114,18 @@
 
         // Add a subscription without a layer.
         MockVmsListener listenerWithoutLayer = new MockVmsListener();
-        mRouting.addSubscription(listenerWithoutLayer );
+        mRouting.addSubscription(listenerWithoutLayer);
 
         // Verify 2 subscribers for the layer.
         assertEquals(2,
-            mRouting.getListeners(LAYER_WITH_SUBSCRIPTION_1).size());
+                mRouting.getListeners(LAYER_WITH_SUBSCRIPTION_1).size());
 
         // Add the listener with layer as also a listener without layer
         mRouting.addSubscription(listenerForLayer);
 
         // The number of listeners for the layer should remain the same as before.
         assertEquals(2,
-            mRouting.getListeners(LAYER_WITH_SUBSCRIPTION_1).size());
+                mRouting.getListeners(LAYER_WITH_SUBSCRIPTION_1).size());
     }
 
     public void testOnlyRelevantSubscribers() throws Exception {
@@ -133,14 +141,126 @@
         Set<MockVmsListener> expectedListeneres = new HashSet<MockVmsListener>();
         expectedListeneres.add(listenerWithoutLayer);
         assertEquals(expectedListeneres,
-            mRouting.getListeners(LAYER_WITHOUT_SUBSCRIPTION));
+                mRouting.getListeners(LAYER_WITHOUT_SUBSCRIPTION));
+    }
+
+    public void testAddingSubscribersAndHalLayersAndSubscribersToPublishers() throws Exception {
+        // Add a subscription to a layer.
+        MockVmsListener listener = new MockVmsListener();
+        mRouting.addSubscription(listener, LAYER_WITH_SUBSCRIPTION_1);
+
+        // Add a HAL subscription.
+        mRouting.addHalSubscription(LAYER_WITH_SUBSCRIPTION_2);
+
+        mRouting.addSubscription(listener, LAYER_WITH_SUBSCRIPTION_1, PUBLISHER_ID_1);
+        mRouting.addSubscription(listener, LAYER_WITH_SUBSCRIPTION_1, PUBLISHER_ID_2);
+        mRouting.addSubscription(listener, LAYER_WITH_SUBSCRIPTION_2, PUBLISHER_ID_2);
+
+        // Verify expected subscriptions are in routing manager.
+        Set<VmsLayer> expectedSubscriptions = new HashSet<>();
+        expectedSubscriptions.add(LAYER_WITH_SUBSCRIPTION_1);
+        expectedSubscriptions.add(LAYER_WITH_SUBSCRIPTION_2);
+
+        Set<VmsAssociatedLayer> expectedSubscriptionsToPublishers = new HashSet<>();
+        expectedSubscriptionsToPublishers.add(new VmsAssociatedLayer(LAYER_WITH_SUBSCRIPTION_1,
+                new HashSet(Arrays.asList(PUBLISHER_ID_1, PUBLISHER_ID_2))));
+        expectedSubscriptionsToPublishers.add(new VmsAssociatedLayer(LAYER_WITH_SUBSCRIPTION_2,
+                new HashSet(Arrays.asList(PUBLISHER_ID_2))));
+
+        VmsSubscriptionState subscriptionState = mRouting.getSubscriptionState();
+        assertEquals(5, subscriptionState.getSequenceNumber());
+        assertEquals(expectedSubscriptions,
+                new HashSet<>(subscriptionState.getSubscribedLayersFromAll()));
+
+        assertEquals(expectedSubscriptionsToPublishers,
+                subscriptionState.getSubscribedLayersFromPublishers());
+
+        // Verify there is only a single listener.
+        assertEquals(1,
+                mRouting.getListeners(LAYER_WITH_SUBSCRIPTION_1).size());
+    }
+
+
+    public void testRemovalOfSubscribersToPublishers() throws Exception {
+        // Add a subscription to a layer.
+        MockVmsListener listener = new MockVmsListener();
+        mRouting.addSubscription(listener, LAYER_WITH_SUBSCRIPTION_1);
+
+        // Add a HAL subscription.
+        mRouting.addHalSubscription(LAYER_WITH_SUBSCRIPTION_2);
+
+        mRouting.addSubscription(listener, LAYER_WITH_SUBSCRIPTION_1, PUBLISHER_ID_1);
+        mRouting.addSubscription(listener, LAYER_WITH_SUBSCRIPTION_1, PUBLISHER_ID_2);
+        mRouting.addSubscription(listener, LAYER_WITH_SUBSCRIPTION_2, PUBLISHER_ID_2);
+        mRouting.removeSubscription(listener, LAYER_WITH_SUBSCRIPTION_2, PUBLISHER_ID_2);
+
+        // Verify expected subscriptions are in routing manager.
+        Set<VmsLayer> expectedSubscriptions = new HashSet<>();
+        expectedSubscriptions.add(LAYER_WITH_SUBSCRIPTION_1);
+        expectedSubscriptions.add(LAYER_WITH_SUBSCRIPTION_2);
+
+
+        Set<VmsAssociatedLayer> expectedSubscriptionsToPublishers = new HashSet<>();
+        expectedSubscriptionsToPublishers.add(new VmsAssociatedLayer(LAYER_WITH_SUBSCRIPTION_1,
+                new HashSet(Arrays.asList(PUBLISHER_ID_1, PUBLISHER_ID_2))));
+
+        VmsSubscriptionState subscriptionState = mRouting.getSubscriptionState();
+        assertEquals(6, subscriptionState.getSequenceNumber());
+        assertEquals(expectedSubscriptions,
+                new HashSet<>(subscriptionState.getSubscribedLayersFromAll()));
+
+        assertEquals(expectedSubscriptionsToPublishers,
+                subscriptionState.getSubscribedLayersFromPublishers());
+
+        // Verify there is only a single listener.
+        assertEquals(1,
+                mRouting.getListeners(LAYER_WITH_SUBSCRIPTION_1).size());
+    }
+
+    public void testRemovalOfSubscribersToPublishersClearListForPublisher() throws Exception {
+        // Add a subscription to a layer.
+        MockVmsListener listener = new MockVmsListener();
+        mRouting.addSubscription(listener, LAYER_WITH_SUBSCRIPTION_1);
+
+        // Add a HAL subscription.
+        mRouting.addHalSubscription(LAYER_WITH_SUBSCRIPTION_2);
+
+        mRouting.addSubscription(listener, LAYER_WITH_SUBSCRIPTION_1, PUBLISHER_ID_1);
+        mRouting.addSubscription(listener, LAYER_WITH_SUBSCRIPTION_1, PUBLISHER_ID_2);
+        mRouting.addSubscription(listener, LAYER_WITH_SUBSCRIPTION_2, PUBLISHER_ID_2);
+        mRouting.removeSubscription(listener, LAYER_WITH_SUBSCRIPTION_1, PUBLISHER_ID_1);
+
+        // Verify expected subscriptions are in routing manager.
+        Set<VmsLayer> expectedSubscriptions = new HashSet<>();
+        expectedSubscriptions.add(LAYER_WITH_SUBSCRIPTION_1);
+        expectedSubscriptions.add(LAYER_WITH_SUBSCRIPTION_2);
+
+        Set<VmsAssociatedLayer> expectedSubscriptionsToPublishers = new HashSet<>();
+        expectedSubscriptionsToPublishers.add(new VmsAssociatedLayer(LAYER_WITH_SUBSCRIPTION_1,
+                new HashSet(Arrays.asList(PUBLISHER_ID_2))));
+        expectedSubscriptionsToPublishers.add(new VmsAssociatedLayer(LAYER_WITH_SUBSCRIPTION_2,
+                new HashSet(Arrays.asList(PUBLISHER_ID_2))));
+
+        VmsSubscriptionState subscriptionState = mRouting.getSubscriptionState();
+        assertEquals(6, subscriptionState.getSequenceNumber());
+        assertEquals(expectedSubscriptions,
+                new HashSet<>(subscriptionState.getSubscribedLayersFromAll()));
+
+        assertEquals(expectedSubscriptionsToPublishers,
+                subscriptionState.getSubscribedLayersFromPublishers());
+
+        // Verify there is only a single listener.
+        assertEquals(1,
+                mRouting.getListeners(LAYER_WITH_SUBSCRIPTION_1).size());
     }
 
     class MockVmsListener extends IVmsSubscriberClient.Stub {
         @Override
-        public void onVmsMessageReceived(VmsLayer layer, byte[] payload) {}
+        public void onVmsMessageReceived(VmsLayer layer, byte[] payload) {
+        }
 
         @Override
-        public void onLayersAvailabilityChange(List<VmsAssociatedLayer> availableLayers) {}
+        public void onLayersAvailabilityChange(List<VmsAssociatedLayer> availableLayers) {
+        }
     }
 }
\ No newline at end of file