Expand integration testing of VMS services.

Bug: 133344007
Test: atest AndroidCarApiTest CarServiceTest CarServiceUnitTest
Change-Id: I80257c8bd66c6ffc4f4779cb29e4f90c416fa10f
diff --git a/tests/carservice_test/src/com/android/car/MockedVmsTestBase.java b/tests/carservice_test/src/com/android/car/MockedVmsTestBase.java
index 4c26e11..c4a8fbf 100644
--- a/tests/carservice_test/src/com/android/car/MockedVmsTestBase.java
+++ b/tests/carservice_test/src/com/android/car/MockedVmsTestBase.java
@@ -38,6 +38,8 @@
 import com.android.car.vehiclehal.VehiclePropValueBuilder;
 import com.android.car.vehiclehal.test.MockedVehicleHal;
 
+import org.junit.Before;
+
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
@@ -73,10 +75,8 @@
                 .addAreaConfig(VehicleAreaType.VEHICLE_AREA_TYPE_GLOBAL, 0, 0);
     }
 
-    @Override
-    public void setUp() throws Exception {
-        super.setUp();
-
+    @Before
+    public void setUpVms() throws Exception {
         // Trigger VmsClientManager to bind to the MockPublisherClient
         getContext().sendBroadcastAsUser(new Intent(Intent.ACTION_USER_UNLOCKED), UserHandle.ALL);
 
@@ -107,16 +107,12 @@
         return mVmsSubscriberManager;
     }
 
-    Pair<VmsLayer, byte[]> receiveDataMessage() throws InterruptedException {
-        return receiveWithTimeout(mSubscriberClient.mMessages);
-    }
-
-    VmsAvailableLayers receiveLayerAvailability() throws InterruptedException {
-        return receiveWithTimeout(mSubscriberClient.mAvailableLayers);
-    }
-
-    MockPublisherClient getMockPublisherClient() throws InterruptedException {
-        sPublisherIsReady.await(2L, TimeUnit.SECONDS);
+    MockPublisherClient getMockPublisherClient() {
+        try {
+            sPublisherIsReady.await(2L, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
         return sPublisherClient;
     }
 
@@ -145,7 +141,7 @@
             mSubscriptionState.add(subscriptionState);
         }
 
-        VmsSubscriptionState receiveSubscriptionState() throws InterruptedException {
+        VmsSubscriptionState receiveSubscriptionState() {
             return receiveWithTimeout(mSubscriptionState);
         }
     }
@@ -166,11 +162,11 @@
             mAvailableLayers.add(availableLayers);
         }
 
-        Pair<VmsLayer, byte[]> receiveMessage() throws InterruptedException {
+        Pair<VmsLayer, byte[]> receiveMessage() {
             return receiveWithTimeout(mMessages);
         }
 
-        VmsAvailableLayers receiveLayerAvailability() throws InterruptedException {
+        VmsAvailableLayers receiveLayerAvailability() {
             return receiveWithTimeout(mAvailableLayers);
         }
     }
@@ -201,12 +197,16 @@
                             .build());
         }
 
-        VehiclePropValue receiveMessage() throws InterruptedException {
+        VehiclePropValue receiveMessage() {
             return receiveWithTimeout(mMessages);
         }
     }
 
-    private static <T> T receiveWithTimeout(BlockingQueue<T> queue) throws InterruptedException {
-        return queue.poll(2L, TimeUnit.SECONDS);
+    private static <T> T receiveWithTimeout(BlockingQueue<T> queue) {
+        try {
+            return queue.poll(2L, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
     }
 }
diff --git a/tests/carservice_test/src/com/android/car/VmsPublisherSubscriberTest.java b/tests/carservice_test/src/com/android/car/VmsPublisherSubscriberTest.java
index 82b89ec..87cf1b8 100644
--- a/tests/carservice_test/src/com/android/car/VmsPublisherSubscriberTest.java
+++ b/tests/carservice_test/src/com/android/car/VmsPublisherSubscriberTest.java
@@ -18,107 +18,582 @@
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 
 import android.car.vms.VmsAssociatedLayer;
 import android.car.vms.VmsAvailableLayers;
 import android.car.vms.VmsLayer;
 import android.car.vms.VmsLayerDependency;
 import android.car.vms.VmsLayersOffering;
+import android.car.vms.VmsSubscriberManager;
+import android.car.vms.VmsSubscriptionState;
 import android.util.Pair;
 
 import androidx.test.filters.MediumTest;
 
+import org.junit.Before;
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
+import java.util.function.Supplier;
+import java.util.function.ToIntFunction;
 
 @MediumTest
 public class VmsPublisherSubscriberTest extends MockedVmsTestBase {
-    private static final int LAYER_ID = 88;
-    private static final int LAYER_VERSION = 19;
-    private static final int LAYER_SUBTYPE = 55;
-    private static final String TAG = "VmsPubSubTest";
+    private static final VmsLayer SUBSCRIPTION_LAYER = new VmsLayer(1, 1, 1);
+    private static final VmsLayer SUBSCRIPTION_LAYER_OTHER = new VmsLayer(2, 1, 1);
 
-    // The expected publisher ID is 0 since it the expected assigned ID from the VMS core.
-    public static final int EXPECTED_PUBLISHER_ID = 0;
-    public static final VmsLayer LAYER = new VmsLayer(LAYER_ID, LAYER_SUBTYPE, LAYER_VERSION);
-    public static final VmsAssociatedLayer ASSOCIATED_LAYER =
-            new VmsAssociatedLayer(LAYER, new HashSet<>(Arrays.asList(EXPECTED_PUBLISHER_ID)));
-    public static final byte[] PAYLOAD = new byte[]{2, 3, 5, 7, 11, 13, 17};
+    private static final byte[] PAYLOAD = {0xa, 0xb};
+    private static final byte[] PAYLOAD_OTHER = {0xb, 0xc};
 
-    private static final List<VmsAssociatedLayer> AVAILABLE_ASSOCIATED_LAYERS =
-            new ArrayList<>(Arrays.asList(ASSOCIATED_LAYER));
-    private static final VmsAvailableLayers AVAILABLE_LAYERS_WITH_SEQ =
-            new VmsAvailableLayers(
-                    new HashSet(AVAILABLE_ASSOCIATED_LAYERS), 1);
+    private static final byte[] PUBLISHER_INFO = {0x0};
+    private static final byte[] PUBLISHER_INFO_OTHER = {0x1};
 
+    private static final int UNKNOWN_PUBLISHER = 99999;
 
-    private static final int SUBSCRIBED_LAYER_ID = 89;
-    public static final VmsLayer SUBSCRIBED_LAYER =
-            new VmsLayer(SUBSCRIBED_LAYER_ID, LAYER_SUBTYPE, LAYER_VERSION);
-    public static final VmsAssociatedLayer ASSOCIATED_SUBSCRIBED_LAYER =
-            new VmsAssociatedLayer(SUBSCRIBED_LAYER,
-                    new HashSet<>(Arrays.asList(EXPECTED_PUBLISHER_ID)));
-    private static final List<VmsAssociatedLayer>
-            AVAILABLE_ASSOCIATED_LAYERS_WITH_SUBSCRIBED_LAYER =
-            new ArrayList<>(Arrays.asList(ASSOCIATED_LAYER, ASSOCIATED_SUBSCRIBED_LAYER));
-    private static final VmsAvailableLayers AVAILABLE_LAYERS_WITH_SUBSCRIBED_LAYER_WITH_SEQ =
-            new VmsAvailableLayers(
-                    new HashSet(AVAILABLE_ASSOCIATED_LAYERS_WITH_SUBSCRIBED_LAYER), 1);
+    private MockPublisherClient mPublisher;
+    private VmsSubscriberManager mSubscriber;
+    private MockSubscriberClient mSubscriberClient;
 
-    /*
-     * This test method subscribes to a layer and triggers
-     * VmsPublisherClientMockService.onVmsSubscriptionChange. In turn, the mock service will publish
-     * a message, which is validated in this test.
-     */
-    @Test
-    public void testPublisherToSubscriber() throws Exception {
-        getSubscriberManager().subscribe(LAYER);
-
-        int publisherId = getMockPublisherClient().getPublisherId(PAYLOAD);
-        getMockPublisherClient().publish(LAYER, publisherId, PAYLOAD);
-
-        Pair<VmsLayer, byte[]> dataMessage = receiveDataMessage();
-        assertEquals(LAYER, dataMessage.first);
-        assertArrayEquals(PAYLOAD, dataMessage.second);
+    @Before
+    public void setUpClients() {
+        mPublisher = getMockPublisherClient();
+        mSubscriber = getSubscriberManager();
+        mSubscriberClient = getMockSubscriberClient();
     }
 
-    /**
-     * The Mock service will get a publisher ID by sending its information when it will get
-     * ServiceReady as well as on SubscriptionChange. Since clients are not notified when
-     * publishers are assigned IDs, this test waits until the availability is changed which
-     * indicates
-     * that the Mock service has gotten its ServiceReady and publisherId.
-     */
     @Test
-    public void testPublisherInfo() throws Exception {
-        int publisherId = getMockPublisherClient().getPublisherId(PAYLOAD);
-        byte[] info = getSubscriberManager().getPublisherInfo(publisherId);
-        assertArrayEquals(PAYLOAD, info);
+    public void testPublisherInfo_Unregistered() {
+        assertEquals(0, mSubscriber.getPublisherInfo(UNKNOWN_PUBLISHER).length);
     }
 
-    /*
-     * The Mock service offers all the subscribed layers as available layers.
-     * In this test the client subscribes to a layer and verifies that it gets the
-     * notification that it is available.
-     */
     @Test
-    public void testAvailabilityWithSubscription() throws Exception {
-        int publisherId = getMockPublisherClient().getPublisherId(PAYLOAD);
-        getMockPublisherClient().setLayersOffering(new VmsLayersOffering(
-                new HashSet<>(Arrays.asList(
-                        new VmsLayerDependency(LAYER),
-                        new VmsLayerDependency(SUBSCRIBED_LAYER))),
+    public void testPublisherInfo_Registered() {
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+        assertArrayEquals(PUBLISHER_INFO, mSubscriber.getPublisherInfo(publisherId));
+    }
+
+    @Test
+    public void testPublisherId_AlreadyRegistered() {
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+        int publisherId2 = mPublisher.getPublisherId(PUBLISHER_INFO);
+        assertEquals(publisherId, publisherId2);
+    }
+
+    @Test
+    public void testPublisherId_MultiplePublishers() {
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+        int publisherId2 = mPublisher.getPublisherId(PUBLISHER_INFO_OTHER);
+        assertNotEquals(publisherId, publisherId2);
+        assertArrayEquals(PUBLISHER_INFO, mSubscriber.getPublisherInfo(publisherId));
+        assertArrayEquals(PUBLISHER_INFO_OTHER, mSubscriber.getPublisherInfo(publisherId2));
+    }
+
+    @Test
+    public void testLayerAvailability_Default() {
+        VmsAvailableLayers availableLayers = mSubscriber.getAvailableLayers();
+        assertEquals(Collections.emptySet(), availableLayers.getAssociatedLayers());
+        assertEquals(0, availableLayers.getSequence());
+    }
+
+    @Test
+    public void testLayerAvailability() {
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+        mPublisher.setLayersOffering(new VmsLayersOffering(Collections.singleton(
+                new VmsLayerDependency(SUBSCRIPTION_LAYER, Collections.emptySet())), publisherId));
+
+        assertLayerAvailability(1,
+                new VmsAssociatedLayer(SUBSCRIPTION_LAYER, Collections.singleton(publisherId)));
+    }
+
+    @Test
+    public void testLayerAvailability_Overwrite() {
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+        mPublisher.setLayersOffering(new VmsLayersOffering(Collections.singleton(
+                new VmsLayerDependency(SUBSCRIPTION_LAYER, Collections.emptySet())), publisherId));
+        mPublisher.setLayersOffering(new VmsLayersOffering(Collections.singleton(
+                new VmsLayerDependency(SUBSCRIPTION_LAYER_OTHER, Collections.emptySet())),
                 publisherId));
 
-        Set<VmsAssociatedLayer> associatedLayers =
-                AVAILABLE_LAYERS_WITH_SUBSCRIBED_LAYER_WITH_SEQ.getAssociatedLayers();
-        assertEquals(associatedLayers, receiveLayerAvailability().getAssociatedLayers());
-        assertEquals(associatedLayers,
-                getSubscriberManager().getAvailableLayers().getAssociatedLayers());
+        assertLayerAvailability(2,
+                new VmsAssociatedLayer(SUBSCRIPTION_LAYER_OTHER,
+                        Collections.singleton(publisherId)));
+    }
+
+    @Test
+    public void testLayerAvailability_MultiplePublishers_SameLayer() {
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+        int publisherId2 = mPublisher.getPublisherId(PUBLISHER_INFO_OTHER);
+        mPublisher.setLayersOffering(new VmsLayersOffering(Collections.singleton(
+                new VmsLayerDependency(SUBSCRIPTION_LAYER, Collections.emptySet())), publisherId));
+        mPublisher.setLayersOffering(new VmsLayersOffering(Collections.singleton(
+                new VmsLayerDependency(SUBSCRIPTION_LAYER, Collections.emptySet())), publisherId2));
+
+        assertLayerAvailability(2,
+                new VmsAssociatedLayer(SUBSCRIPTION_LAYER,
+                        new HashSet<>(Arrays.asList(publisherId, publisherId2))));
+    }
+
+    @Test
+    public void testLayerAvailability_MultiplePublishers_MultipleLayers() {
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+        int publisherId2 = mPublisher.getPublisherId(PUBLISHER_INFO_OTHER);
+        mPublisher.setLayersOffering(new VmsLayersOffering(Collections.singleton(
+                new VmsLayerDependency(SUBSCRIPTION_LAYER, Collections.emptySet())), publisherId));
+        mPublisher.setLayersOffering(new VmsLayersOffering(Collections.singleton(
+                new VmsLayerDependency(SUBSCRIPTION_LAYER_OTHER, Collections.emptySet())),
+                publisherId2));
+
+        assertLayerAvailability(2,
+                new VmsAssociatedLayer(SUBSCRIPTION_LAYER, Collections.singleton(publisherId)),
+                new VmsAssociatedLayer(SUBSCRIPTION_LAYER_OTHER,
+                        Collections.singleton(publisherId2)));
+    }
+
+    @Test
+    public void testLayerAvailability_MultiplePublishers_Remove() {
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+        int publisherId2 = mPublisher.getPublisherId(PUBLISHER_INFO_OTHER);
+        mPublisher.setLayersOffering(new VmsLayersOffering(Collections.singleton(
+                new VmsLayerDependency(SUBSCRIPTION_LAYER, Collections.emptySet())), publisherId));
+        mPublisher.setLayersOffering(new VmsLayersOffering(Collections.singleton(
+                new VmsLayerDependency(SUBSCRIPTION_LAYER, Collections.emptySet())), publisherId2));
+
+        mPublisher.setLayersOffering(new VmsLayersOffering(Collections.emptySet(), publisherId2));
+
+        assertLayerAvailability(3,
+                new VmsAssociatedLayer(SUBSCRIPTION_LAYER, Collections.singleton(publisherId)));
+    }
+
+    @Test
+    public void testLayerAvailability_MultiplePublishers_Overwrite() {
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+        int publisherId2 = mPublisher.getPublisherId(PUBLISHER_INFO_OTHER);
+        mPublisher.setLayersOffering(new VmsLayersOffering(Collections.singleton(
+                new VmsLayerDependency(SUBSCRIPTION_LAYER, Collections.emptySet())), publisherId));
+        mPublisher.setLayersOffering(new VmsLayersOffering(Collections.singleton(
+                new VmsLayerDependency(SUBSCRIPTION_LAYER, Collections.emptySet())), publisherId2));
+
+        mPublisher.setLayersOffering(new VmsLayersOffering(Collections.singleton(
+                new VmsLayerDependency(SUBSCRIPTION_LAYER_OTHER, Collections.emptySet())),
+                publisherId2));
+
+        assertLayerAvailability(3,
+                new VmsAssociatedLayer(SUBSCRIPTION_LAYER, Collections.singleton(publisherId)),
+                new VmsAssociatedLayer(SUBSCRIPTION_LAYER_OTHER,
+                        Collections.singleton(publisherId2)));
+    }
+
+    @Test
+    public void testStartMonitoring() {
+        mSubscriber.startMonitoring();
+        assertNull(mPublisher.receiveSubscriptionState());
+
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
+        assertDataMessage(SUBSCRIPTION_LAYER, PAYLOAD);
+    }
+
+    @Test
+    public void testStartMonitoring_AfterPublish() {
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
+
+        mSubscriber.startMonitoring();
+        assertNull(mSubscriberClient.receiveMessage());
+    }
+
+    @Test
+    public void testStartMonitoring_MultipleLayers() {
+        mSubscriber.startMonitoring();
+
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
+        assertDataMessage(SUBSCRIPTION_LAYER, PAYLOAD);
+
+        mPublisher.publish(SUBSCRIPTION_LAYER_OTHER, publisherId, PAYLOAD_OTHER);
+        assertDataMessage(SUBSCRIPTION_LAYER_OTHER, PAYLOAD_OTHER);
+    }
+
+    @Test
+    public void testStartMonitoring_MultiplePublishers() {
+        mSubscriber.startMonitoring();
+
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
+        assertDataMessage(SUBSCRIPTION_LAYER, PAYLOAD);
+
+        int publisherId2 = mPublisher.getPublisherId(PUBLISHER_INFO_OTHER);
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId2, PAYLOAD_OTHER);
+        assertDataMessage(SUBSCRIPTION_LAYER, PAYLOAD_OTHER);
+    }
+
+    @Test
+    public void testStopMonitoring() {
+        mSubscriber.startMonitoring();
+        mSubscriber.stopMonitoring();
+        assertNull(mPublisher.receiveSubscriptionState());
+
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
+        assertNull(mSubscriberClient.receiveMessage());
+    }
+
+    @Test
+    public void testSubscribe() {
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER);
+        assertSubscriptionState(1, SUBSCRIPTION_LAYER);
+
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
+        assertDataMessage(SUBSCRIPTION_LAYER, PAYLOAD);
+    }
+
+    @Test
+    public void testSubscribe_AfterPublish() {
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER);
+
+        assertSubscriptionState(1, SUBSCRIPTION_LAYER);
+        assertNull(mSubscriberClient.receiveMessage());
+    }
+
+    @Test
+    public void testSubscribe_MultipleLayers() {
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER);
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER_OTHER);
+        assertSubscriptionState(2, SUBSCRIPTION_LAYER, SUBSCRIPTION_LAYER_OTHER);
+
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
+        assertDataMessage(SUBSCRIPTION_LAYER, PAYLOAD);
+
+        mPublisher.publish(SUBSCRIPTION_LAYER_OTHER, publisherId, PAYLOAD_OTHER);
+        assertDataMessage(SUBSCRIPTION_LAYER_OTHER, PAYLOAD_OTHER);
+    }
+
+    @Test
+    public void testSubscribe_MultiplePublishers() {
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER);
+        assertSubscriptionState(1, SUBSCRIPTION_LAYER);
+
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
+        assertDataMessage(SUBSCRIPTION_LAYER, PAYLOAD);
+
+        int publisherId2 = mPublisher.getPublisherId(PUBLISHER_INFO_OTHER);
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId2, PAYLOAD_OTHER);
+        assertDataMessage(SUBSCRIPTION_LAYER, PAYLOAD_OTHER);
+    }
+
+    @Test
+    public void testSubscribe_MultipleLayers_MultiplePublishers() {
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER);
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER_OTHER);
+        assertSubscriptionState(2, SUBSCRIPTION_LAYER, SUBSCRIPTION_LAYER_OTHER);
+
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
+        assertDataMessage(SUBSCRIPTION_LAYER, PAYLOAD);
+
+        int publisherId2 = mPublisher.getPublisherId(PUBLISHER_INFO_OTHER);
+        mPublisher.publish(SUBSCRIPTION_LAYER_OTHER, publisherId2, PAYLOAD_OTHER);
+        assertDataMessage(SUBSCRIPTION_LAYER_OTHER, PAYLOAD_OTHER);
+    }
+
+    @Test
+    public void testSubscribe_ClearCallback() {
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER);
+        mSubscriber.clearVmsSubscriberClientCallback();
+
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
+        assertNull(mSubscriberClient.receiveMessage());
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testSubscribe_NoCallback() {
+        mSubscriber.clearVmsSubscriberClientCallback();
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER);
+    }
+
+    @Test
+    public void testUnsubscribe() {
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER);
+        mSubscriber.unsubscribe(SUBSCRIPTION_LAYER);
+        assertSubscriptionState(2, Collections.emptySet(), Collections.emptySet());
+
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
+        assertNull(mSubscriberClient.receiveMessage());
+    }
+
+    @Test
+    public void testUnsubscribe_MultipleLayers() {
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER);
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER_OTHER);
+        mSubscriber.unsubscribe(SUBSCRIPTION_LAYER);
+        assertSubscriptionState(3, SUBSCRIPTION_LAYER_OTHER);
+
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
+        assertNull(mSubscriberClient.receiveMessage());
+
+        mPublisher.publish(SUBSCRIPTION_LAYER_OTHER, publisherId, PAYLOAD_OTHER);
+        assertDataMessage(SUBSCRIPTION_LAYER_OTHER, PAYLOAD_OTHER);
+    }
+
+    @Test
+    public void testUnsubscribe_MultiplePublishers() {
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER);
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER_OTHER);
+        mSubscriber.unsubscribe(SUBSCRIPTION_LAYER);
+        assertSubscriptionState(3, SUBSCRIPTION_LAYER_OTHER);
+
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
+        assertNull(mSubscriberClient.receiveMessage());
+
+        int publisherId2 = mPublisher.getPublisherId(PUBLISHER_INFO_OTHER);
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId2, PAYLOAD_OTHER);
+        assertNull(mSubscriberClient.receiveMessage());
+    }
+
+    @Test
+    public void testUnsubscribe_NotSubscribed() {
+        mSubscriber.unsubscribe(SUBSCRIPTION_LAYER);
+        assertNull(mPublisher.receiveSubscriptionState());
+
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
+        assertNull(mSubscriberClient.receiveMessage());
+    }
+
+    @Test
+    public void testSubscribeToPublisher() {
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER, publisherId);
+        assertSubscriptionState(1,
+                new VmsAssociatedLayer(SUBSCRIPTION_LAYER, Collections.singleton(publisherId)));
+
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
+        assertDataMessage(SUBSCRIPTION_LAYER, PAYLOAD);
+    }
+
+    @Test
+    public void testSubscribeToPublisher_AfterPublish() {
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER, publisherId);
+
+        assertSubscriptionState(1,
+                new VmsAssociatedLayer(SUBSCRIPTION_LAYER, Collections.singleton(publisherId)));
+
+        assertNull(mSubscriberClient.receiveMessage());
+    }
+
+    @Test
+    public void testSubscribeToPublisher_MultipleLayers() {
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER, publisherId);
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER_OTHER, publisherId);
+
+        assertSubscriptionState(2,
+                new VmsAssociatedLayer(SUBSCRIPTION_LAYER, Collections.singleton(publisherId)),
+                new VmsAssociatedLayer(SUBSCRIPTION_LAYER_OTHER,
+                        Collections.singleton(publisherId)));
+
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
+        assertDataMessage(SUBSCRIPTION_LAYER, PAYLOAD);
+
+        mPublisher.publish(SUBSCRIPTION_LAYER_OTHER, publisherId, PAYLOAD_OTHER);
+        assertDataMessage(SUBSCRIPTION_LAYER_OTHER, PAYLOAD_OTHER);
+    }
+
+    @Test
+    public void testSubscribeToPublisher_MultiplePublishers() {
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+        int publisherId2 = mPublisher.getPublisherId(PUBLISHER_INFO_OTHER);
+
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER, publisherId);
+        assertSubscriptionState(1,
+                new VmsAssociatedLayer(SUBSCRIPTION_LAYER, Collections.singleton(publisherId)));
+
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
+        assertDataMessage(SUBSCRIPTION_LAYER, PAYLOAD);
+
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId2, PAYLOAD_OTHER);
+        assertNull(mSubscriberClient.receiveMessage());
+    }
+
+    @Test
+    public void testSubscribeToPublisher_MultipleLayers_MultiplePublishers() {
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+        int publisherId2 = mPublisher.getPublisherId(PUBLISHER_INFO_OTHER);
+
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER, publisherId);
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER_OTHER, publisherId2);
+        assertSubscriptionState(2,
+                new VmsAssociatedLayer(SUBSCRIPTION_LAYER, Collections.singleton(publisherId)),
+                new VmsAssociatedLayer(SUBSCRIPTION_LAYER_OTHER,
+                        Collections.singleton(publisherId2)));
+
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
+        assertDataMessage(SUBSCRIPTION_LAYER, PAYLOAD);
+
+        mPublisher.publish(SUBSCRIPTION_LAYER_OTHER, publisherId2, PAYLOAD_OTHER);
+        assertDataMessage(SUBSCRIPTION_LAYER_OTHER, PAYLOAD_OTHER);
+    }
+
+    @Test
+    public void testSubscribeToPublisher_UnknownPublisher() {
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER, UNKNOWN_PUBLISHER);
+
+        assertSubscriptionState(1,
+                new VmsAssociatedLayer(SUBSCRIPTION_LAYER,
+                        Collections.singleton(UNKNOWN_PUBLISHER)));
+    }
+
+    @Test
+    public void testSubscribeToPublisher_ClearCallback() {
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER, publisherId);
+        mSubscriber.clearVmsSubscriberClientCallback();
+
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
+        assertNull(mSubscriberClient.receiveMessage());
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testSubscribeToPublisher_NoCallback() {
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+
+        mSubscriber.clearVmsSubscriberClientCallback();
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER, publisherId);
+    }
+
+    @Test
+    public void testUnsubscribeToPublisher() {
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER, publisherId);
+        mSubscriber.unsubscribe(SUBSCRIPTION_LAYER, publisherId);
+        assertSubscriptionState(2, Collections.emptySet(), Collections.emptySet());
+
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
+        assertNull(mSubscriberClient.receiveMessage());
+    }
+
+    @Test
+    public void testUnsubscribeToPublisher_MultipleLayers() {
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER, publisherId);
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER_OTHER, publisherId);
+        mSubscriber.unsubscribe(SUBSCRIPTION_LAYER, publisherId);
+        assertSubscriptionState(3,
+                new VmsAssociatedLayer(SUBSCRIPTION_LAYER_OTHER,
+                        Collections.singleton(publisherId)));
+
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
+        assertNull(mSubscriberClient.receiveMessage());
+
+        mPublisher.publish(SUBSCRIPTION_LAYER_OTHER, publisherId, PAYLOAD_OTHER);
+        assertDataMessage(SUBSCRIPTION_LAYER_OTHER, PAYLOAD_OTHER);
+    }
+
+    @Test
+    public void testUnsubscribeToPublisher_MultiplePublishers() {
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+        int publisherId2 = mPublisher.getPublisherId(PUBLISHER_INFO_OTHER);
+
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER, publisherId);
+        mSubscriber.subscribe(SUBSCRIPTION_LAYER, publisherId2);
+        mSubscriber.unsubscribe(SUBSCRIPTION_LAYER, publisherId);
+        assertSubscriptionState(3,
+                new VmsAssociatedLayer(SUBSCRIPTION_LAYER, Collections.singleton(publisherId2)));
+
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
+        assertNull(mSubscriberClient.receiveMessage());
+
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId2, PAYLOAD_OTHER);
+        assertDataMessage(SUBSCRIPTION_LAYER, PAYLOAD_OTHER);
+    }
+
+    @Test
+    public void testUnsubscribeToPublisher_NotSubscribed() {
+        int publisherId = mPublisher.getPublisherId(PUBLISHER_INFO);
+
+        mSubscriber.unsubscribe(SUBSCRIPTION_LAYER, publisherId);
+        assertNull(mPublisher.receiveSubscriptionState());
+
+        mPublisher.publish(SUBSCRIPTION_LAYER, publisherId, PAYLOAD);
+        assertNull(mSubscriberClient.receiveMessage());
+    }
+
+    @Test
+    public void testUnsubscribeToPublisher_UnknownPublisher() {
+        mSubscriber.unsubscribe(SUBSCRIPTION_LAYER, UNKNOWN_PUBLISHER);
+        assertNull(mPublisher.receiveSubscriptionState());
+    }
+
+    private void assertLayerAvailability(int sequenceNumber,
+            VmsAssociatedLayer... associatedLayers) {
+        VmsAvailableLayers availableLayers = receiveWithSequence(
+                getMockSubscriberClient()::receiveLayerAvailability,
+                VmsAvailableLayers::getSequence,
+                sequenceNumber);
+        assertEquals(availableLayers, mSubscriber.getAvailableLayers());
+        assertEquals(new HashSet<>(Arrays.asList(associatedLayers)),
+                availableLayers.getAssociatedLayers());
+    }
+
+    private void assertSubscriptionState(int sequenceNumber, VmsLayer... layers) {
+        assertSubscriptionState(sequenceNumber, new HashSet<>(Arrays.asList(layers)),
+                Collections.emptySet());
+    }
+
+    private void assertSubscriptionState(int sequenceNumber,
+            VmsAssociatedLayer... associatedLayers) {
+        assertSubscriptionState(sequenceNumber, Collections.emptySet(),
+                new HashSet<>(Arrays.asList(associatedLayers)));
+    }
+
+    private void assertSubscriptionState(int sequenceNumber, Set<VmsLayer> layers,
+            Set<VmsAssociatedLayer> associatedLayers) {
+        VmsSubscriptionState subscriptionState = receiveWithSequence(
+                mPublisher::receiveSubscriptionState,
+                VmsSubscriptionState::getSequenceNumber,
+                sequenceNumber);
+        assertEquals(layers, subscriptionState.getLayers());
+        assertEquals(associatedLayers, subscriptionState.getAssociatedLayers());
+    }
+
+    private static <T> T receiveWithSequence(Supplier<T> supplierFunction,
+            ToIntFunction<T> sequenceNumberFn, int sequenceNumber) {
+        T obj = null;
+        for (int seq = 1; seq <= sequenceNumber; seq++) {
+            obj = supplierFunction.get();
+            assertNotNull(obj);
+            assertEquals(seq, sequenceNumberFn.applyAsInt(obj));
+        }
+        return obj;
+    }
+
+    private void assertDataMessage(VmsLayer layer, byte[] payload) {
+        Pair<VmsLayer, byte[]> message = mSubscriberClient.receiveMessage();
+        assertEquals(layer, message.first);
+        assertArrayEquals(payload, message.second);
     }
 }