Use SharedMemory for transporting large packets.

Bug: 151326583
Test: atest CarServiceTest CarServiceUnitTest
Change-Id: Ifae8581d3d982697e6a23e203070c73906d80c18
diff --git a/car-lib/src/android/car/vms/IVmsBrokerService.aidl b/car-lib/src/android/car/vms/IVmsBrokerService.aidl
index cb70323..28b50a6 100644
--- a/car-lib/src/android/car/vms/IVmsBrokerService.aidl
+++ b/car-lib/src/android/car/vms/IVmsBrokerService.aidl
@@ -22,6 +22,7 @@
 import android.car.vms.VmsLayerDependency;
 import android.car.vms.VmsProviderInfo;
 import android.car.vms.VmsRegistrationInfo;
+import android.os.SharedMemory;
 
 /**
  * Hidden API for communicating with the Vehicle Map Service message broker.
@@ -68,4 +69,10 @@
         int providerId,
         in VmsLayer layer,
         in byte[] packet) = 7;
+
+    void publishLargePacket(
+        in IBinder token,
+        int providerId,
+        in VmsLayer layer,
+        in SharedMemory packet) = 8;
 }
diff --git a/car-lib/src/android/car/vms/IVmsClientCallback.aidl b/car-lib/src/android/car/vms/IVmsClientCallback.aidl
index 5c8886d..c9b2f96 100644
--- a/car-lib/src/android/car/vms/IVmsClientCallback.aidl
+++ b/car-lib/src/android/car/vms/IVmsClientCallback.aidl
@@ -19,6 +19,7 @@
 import android.car.vms.VmsAvailableLayers;
 import android.car.vms.VmsLayer;
 import android.car.vms.VmsSubscriptionState;
+import android.os.SharedMemory;
 
 /**
  * Hidden API for sending notifications to Vehicle Map Service clients.
@@ -36,4 +37,9 @@
         int providerId,
         in VmsLayer layer,
         in byte[] packet) = 2;
+
+    void onLargePacketReceived(
+        int providerId,
+        in VmsLayer layer,
+        in SharedMemory packet) = 3;
 }
diff --git a/car-lib/src/android/car/vms/VmsClient.java b/car-lib/src/android/car/vms/VmsClient.java
index b580ced..394ba67 100644
--- a/car-lib/src/android/car/vms/VmsClient.java
+++ b/car-lib/src/android/car/vms/VmsClient.java
@@ -16,6 +16,8 @@
 
 package android.car.vms;
 
+import static android.system.OsConstants.PROT_READ;
+
 import android.annotation.NonNull;
 import android.annotation.Nullable;
 import android.annotation.RequiresPermission;
@@ -25,11 +27,14 @@
 import android.os.Binder;
 import android.os.IBinder;
 import android.os.RemoteException;
+import android.os.SharedMemory;
+import android.system.ErrnoException;
 import android.util.Log;
 
 import com.android.internal.annotations.GuardedBy;
 
 import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Objects;
@@ -54,6 +59,7 @@
             new VmsAvailableLayers(Collections.emptySet(), 0);
     private static final VmsSubscriptionState DEFAULT_SUBSCRIPTIONS =
             new VmsSubscriptionState(0, Collections.emptySet(), Collections.emptySet());
+    private static final int LARGE_PACKET_THRESHOLD = 16 * 1024; // 16 KB
 
     private final IVmsBrokerService mService;
     private final Executor mExecutor;
@@ -236,11 +242,20 @@
      */
     @RequiresPermission(Car.PERMISSION_VMS_PUBLISHER)
     public void publishPacket(int providerId, @NonNull VmsLayer layer, @NonNull byte[] packet) {
-        if (DBG) Log.d(TAG, "Publishing packet as " + providerId);
         Objects.requireNonNull(layer, "layer cannot be null");
         Objects.requireNonNull(packet, "packet cannot be null");
+        if (DBG) {
+            Log.d(TAG, "Publishing packet as " + providerId + " (" + packet.length + " bytes)");
+        }
         try {
-            mService.publishPacket(mClientToken, providerId, layer, packet);
+            if (packet.length < LARGE_PACKET_THRESHOLD) {
+                mService.publishPacket(mClientToken, providerId, layer, packet);
+            } else {
+                try (SharedMemory largePacket = packetToSharedMemory(packet)) {
+                    mService.publishLargePacket(mClientToken, providerId, layer,
+                            largePacket);
+                }
+            }
         } catch (RemoteException e) {
             Log.e(TAG, "While publishing packet as " + providerId);
             mExceptionHandler.accept(e);
@@ -312,11 +327,27 @@
 
         @Override
         public void onPacketReceived(int providerId, VmsLayer layer, byte[] packet) {
-            if (DBG) Log.d(TAG, "Received packet from " + providerId + " for: " + layer);
+            if (DBG) {
+                Log.d(TAG, "Received packet from " + providerId + " for: " + layer
+                        + " (" + packet.length + " bytes)");
+            }
             executeCallback((client, callback) ->
                     callback.onPacketReceived(providerId, layer, packet));
         }
 
+        @Override
+        public void onLargePacketReceived(int providerId, VmsLayer layer, SharedMemory packet) {
+            try (SharedMemory largePacket = packet) {
+                if (DBG) {
+                    Log.d(TAG, "Received large packet from " + providerId + " for: " + layer
+                            + " (" + largePacket.getSize() + " bytes)");
+                }
+                byte[] packetData = sharedMemoryToPacket(largePacket);
+                executeCallback((client, callback) ->
+                        callback.onPacketReceived(providerId, layer, packetData));
+            }
+        }
+
         private void executeCallback(BiConsumer<VmsClient, VmsClientCallback> callbackOperation) {
             final VmsClient client = mClient.get();
             if (client == null) {
@@ -332,4 +363,51 @@
             }
         }
     }
+
+    private static SharedMemory packetToSharedMemory(byte[] packet) {
+        SharedMemory shm;
+        try {
+            shm = SharedMemory.create("VmsClient", packet.length);
+        } catch (ErrnoException e) {
+            throw new IllegalStateException("Failed to allocate shared memory", e);
+        }
+
+        ByteBuffer buffer = null;
+        try {
+            buffer = shm.mapReadWrite();
+            buffer.put(packet);
+        } catch (ErrnoException e) {
+            shm.close();
+            throw new IllegalStateException("Failed to create write buffer", e);
+        } finally {
+            if (buffer != null) {
+                SharedMemory.unmap(buffer);
+            }
+        }
+
+        if (!shm.setProtect(PROT_READ)) {
+            shm.close();
+            throw new SecurityException("Failed to set read-only protection on shared memory");
+        }
+
+        return shm;
+    }
+
+    private static byte[] sharedMemoryToPacket(SharedMemory shm) {
+        ByteBuffer buffer;
+        try {
+            buffer = shm.mapReadOnly();
+        } catch (ErrnoException e) {
+            throw new IllegalStateException("Failed to create read buffer", e);
+        }
+
+        byte[] packet;
+        try {
+            packet = new byte[buffer.capacity()];
+            buffer.get(packet);
+        } finally {
+            SharedMemory.unmap(buffer);
+        }
+        return packet;
+    }
 }
diff --git a/service/src/com/android/car/vms/VmsNewBrokerService.java b/service/src/com/android/car/vms/VmsNewBrokerService.java
index 8cbc33d..d956bd2 100644
--- a/service/src/com/android/car/vms/VmsNewBrokerService.java
+++ b/service/src/com/android/car/vms/VmsNewBrokerService.java
@@ -35,6 +35,7 @@
 import android.os.Binder;
 import android.os.IBinder;
 import android.os.RemoteException;
+import android.os.SharedMemory;
 import android.util.ArrayMap;
 import android.util.ArraySet;
 import android.util.Log;
@@ -46,6 +47,7 @@
 import com.android.car.stats.VmsClientLogger;
 import com.android.internal.annotations.GuardedBy;
 import com.android.internal.annotations.VisibleForTesting;
+import com.android.internal.util.FunctionalUtils.ThrowingConsumer;
 
 import java.io.PrintWriter;
 import java.util.ArrayList;
@@ -195,13 +197,28 @@
     @Override
     public void publishPacket(IBinder clientToken, int providerId, VmsLayer layer, byte[] packet) {
         assertVmsPublisherPermission(mContext);
+        deliverToSubscribers(clientToken, providerId, layer, packet.length,
+                callback -> callback.onPacketReceived(providerId, layer, packet));
+    }
+
+    @Override
+    public void publishLargePacket(IBinder clientToken, int providerId, VmsLayer layer,
+            SharedMemory packet) {
+        try (SharedMemory largePacket = packet) {
+            assertVmsPublisherPermission(mContext);
+            deliverToSubscribers(clientToken, providerId, layer, packet.getSize(),
+                    callback -> callback.onLargePacketReceived(providerId, layer, largePacket));
+        }
+    }
+
+    private void deliverToSubscribers(IBinder clientToken, int providerId, VmsLayer layer,
+            int packetLength, ThrowingConsumer<IVmsClientCallback> callbackConsumer) {
         VmsClientInfo client = getClient(clientToken);
         if (!client.hasOffering(providerId, layer) && !client.isLegacyClient()) {
             throw new IllegalArgumentException("Client does not offer " + layer + " as "
                     + providerId);
         }
 
-        int packetLength = packet != null ? packet.length : 0;
         mStatsService.getVmsClientLogger(client.getUid())
                 .logPacketSent(layer, packetLength);
 
@@ -222,14 +239,14 @@
 
         for (VmsClientInfo subscriber : subscribers) {
             try {
-                subscriber.getCallback().onPacketReceived(providerId, layer, packet);
+                callbackConsumer.accept(subscriber.getCallback());
                 mStatsService.getVmsClientLogger(subscriber.getUid())
                         .logPacketReceived(layer, packetLength);
-            } catch (RemoteException ex) {
+            } catch (RuntimeException e) {
                 mStatsService.getVmsClientLogger(subscriber.getUid())
                         .logPacketDropped(layer, packetLength);
                 Log.e(TAG, String.format("Unable to publish to listener: %s",
-                        subscriber.getPackageName()));
+                        subscriber.getPackageName()), e);
             }
         }
     }
diff --git a/tests/carservice_test/src/com/android/car/vms/VmsClientTest.java b/tests/carservice_test/src/com/android/car/vms/VmsClientTest.java
index 235e438..257d1ff 100644
--- a/tests/carservice_test/src/com/android/car/vms/VmsClientTest.java
+++ b/tests/carservice_test/src/com/android/car/vms/VmsClientTest.java
@@ -79,6 +79,7 @@
     private static final VmsLayer LAYER3 = new VmsLayer(3, 1, 1);
 
     private static final byte[] PAYLOAD = {1, 2, 3, 4, 5, 6, 7, 8};
+    private static final byte[] LARGE_PAYLOAD = new byte[16 * 1024]; // 16KB
 
     @Rule
     public MockitoRule mMockitoRule = MockitoJUnit.rule();
@@ -96,6 +97,7 @@
     @Before
     public void setUpTest() {
         mClientManager = (VmsClientManager) getCar().getCarManager(Car.VEHICLE_MAP_SERVICE);
+        LARGE_PAYLOAD[0] = 123;
     }
 
     @Test
@@ -2378,6 +2380,183 @@
         verifyPacketReceived(mClientCallback2, providerId, LAYER1, PAYLOAD);
     }
 
+    @Test
+    public void testPublishPacket_Large_UnknownOffering() {
+        VmsClient client = connectVmsClient(mClientCallback1);
+        int providerId = client.registerProvider(PROVIDER_DESC1);
+
+        assertThrows(
+                IllegalArgumentException.class,
+                () -> client.publishPacket(providerId, LAYER1, LARGE_PAYLOAD));
+    }
+
+    @Test
+    public void testPublishPacket_Large_NoSubscribers() {
+        VmsClient client = connectVmsClient(mClientCallback1);
+        int providerId = client.registerProvider(PROVIDER_DESC1);
+        client.setProviderOfferings(providerId, asSet(
+                new VmsLayerDependency(LAYER1)
+        ));
+        connectVmsClient(mClientCallback2);
+
+        client.publishPacket(providerId, LAYER1, LARGE_PAYLOAD);
+
+        awaitTaskCompletion();
+        verifyNoPacketsReceived(mClientCallback1, providerId, LAYER1);
+        verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+    }
+
+    @Test
+    public void testPublishPacket_Large_MonitorSubscriber_Enabled() {
+        VmsClient client = connectVmsClient(mClientCallback1);
+        int providerId = client.registerProvider(PROVIDER_DESC1);
+        client.setProviderOfferings(providerId, asSet(
+                new VmsLayerDependency(LAYER1)
+        ));
+        connectVmsClient(mClientCallback2);
+
+        client.setMonitoringEnabled(true);
+        client.publishPacket(providerId, LAYER1, LARGE_PAYLOAD);
+
+        awaitTaskCompletion();
+        verifyPacketReceived(mClientCallback1, providerId, LAYER1, LARGE_PAYLOAD);
+        verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+    }
+
+    @Test
+    public void testPublishPacket_Large_MonitorSubscriber_EnabledAndDisabled() {
+        VmsClient client = connectVmsClient(mClientCallback1);
+        int providerId = client.registerProvider(PROVIDER_DESC1);
+        client.setProviderOfferings(providerId, asSet(
+                new VmsLayerDependency(LAYER1)
+        ));
+        connectVmsClient(mClientCallback2);
+
+        client.setMonitoringEnabled(true);
+        client.setMonitoringEnabled(false);
+        client.publishPacket(providerId, LAYER1, LARGE_PAYLOAD);
+
+        awaitTaskCompletion();
+        verifyNoPacketsReceived(mClientCallback1, providerId, LAYER1);
+        verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+    }
+
+    @Test
+    public void testPublishPacket_Large_LayerSubscriber() {
+        VmsClient client = connectVmsClient(mClientCallback1);
+        int providerId = client.registerProvider(PROVIDER_DESC1);
+        client.setProviderOfferings(providerId, asSet(
+                new VmsLayerDependency(LAYER1)
+        ));
+        connectVmsClient(mClientCallback2);
+
+        client.setSubscriptions(asSet(
+                new VmsAssociatedLayer(LAYER1, emptySet())
+        ));
+        client.publishPacket(providerId, LAYER1, LARGE_PAYLOAD);
+
+        awaitTaskCompletion();
+        verifyPacketReceived(mClientCallback1, providerId, LAYER1, LARGE_PAYLOAD);
+        verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+    }
+
+    @Test
+    public void testPublishPacket_Large_LayerSubscriber_Unsubscribe() {
+        VmsClient client = connectVmsClient(mClientCallback1);
+        int providerId = client.registerProvider(PROVIDER_DESC1);
+        client.setProviderOfferings(providerId, asSet(
+                new VmsLayerDependency(LAYER1)
+        ));
+        connectVmsClient(mClientCallback2);
+
+        client.setSubscriptions(asSet(
+                new VmsAssociatedLayer(LAYER1, emptySet())
+        ));
+        client.setSubscriptions(emptySet());
+        client.publishPacket(providerId, LAYER1, LARGE_PAYLOAD);
+
+        awaitTaskCompletion();
+        verifyNoPacketsReceived(mClientCallback1, providerId, LAYER1);
+        verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+    }
+
+    @Test
+    public void testPublishPacket_Large_LayerSubscriber_DifferentLayer() {
+        VmsClient client = connectVmsClient(mClientCallback1);
+        int providerId = client.registerProvider(PROVIDER_DESC1);
+        client.setProviderOfferings(providerId, asSet(
+                new VmsLayerDependency(LAYER1)
+        ));
+        connectVmsClient(mClientCallback2);
+
+        client.setSubscriptions(asSet(
+                new VmsAssociatedLayer(LAYER2, emptySet())
+        ));
+        client.publishPacket(providerId, LAYER1, LARGE_PAYLOAD);
+
+        awaitTaskCompletion();
+        verifyNoPacketsReceived(mClientCallback1, providerId, LAYER1);
+        verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+    }
+
+    @Test
+    public void testPublishPacket_Large_LayerAndProviderSubscriber() {
+        VmsClient client = connectVmsClient(mClientCallback1);
+        int providerId = client.registerProvider(PROVIDER_DESC1);
+        client.setProviderOfferings(providerId, asSet(
+                new VmsLayerDependency(LAYER1)
+        ));
+        connectVmsClient(mClientCallback2);
+
+        client.setSubscriptions(asSet(
+                new VmsAssociatedLayer(LAYER1, asSet(providerId))
+        ));
+        client.publishPacket(providerId, LAYER1, LARGE_PAYLOAD);
+
+        awaitTaskCompletion();
+        verifyPacketReceived(mClientCallback1, providerId, LAYER1, LARGE_PAYLOAD);
+        verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+    }
+
+    @Test
+    public void testPublishPacket_Large_LayerAndProviderSubscriber_Unsubscribe() {
+        VmsClient client = connectVmsClient(mClientCallback1);
+        int providerId = client.registerProvider(PROVIDER_DESC1);
+        client.setProviderOfferings(providerId, asSet(
+                new VmsLayerDependency(LAYER1)
+        ));
+        connectVmsClient(mClientCallback2);
+
+        client.setSubscriptions(asSet(
+                new VmsAssociatedLayer(LAYER1, asSet(providerId))
+        ));
+        client.setSubscriptions(emptySet());
+        client.publishPacket(providerId, LAYER1, LARGE_PAYLOAD);
+
+        verifyNoPacketsReceived(mClientCallback1, providerId, LAYER1);
+        verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+    }
+
+    @Test
+    public void testPublishPacket_Large_LayerAndProviderSubscriber_DifferentProvider() {
+        VmsClient client = connectVmsClient(mClientCallback1);
+        int providerId = client.registerProvider(PROVIDER_DESC1);
+        int providerId2 = client.registerProvider(PROVIDER_DESC2);
+        client.setProviderOfferings(providerId, asSet(
+                new VmsLayerDependency(LAYER1)
+        ));
+        connectVmsClient(mClientCallback2);
+
+        client.setSubscriptions(asSet(
+                new VmsAssociatedLayer(LAYER1, asSet(providerId2))
+        ));
+        client.publishPacket(providerId, LAYER1, LARGE_PAYLOAD);
+
+        awaitTaskCompletion();
+        verifyNoPacketsReceived(mClientCallback1, providerId, LAYER1);
+        verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+    }
+
     private VmsClient connectVmsClient(VmsClientCallback callback) {
         mClientManager.registerVmsClientCallback(mExecutor, callback);
         verify(callback, timeout(CONNECT_TIMEOUT)).onClientConnected(mClientCaptor.capture());
diff --git a/tests/carservice_unit_test/src/com/android/car/vms/VmsBrokerServiceTest.java b/tests/carservice_unit_test/src/com/android/car/vms/VmsBrokerServiceTest.java
index ffc3ac7..db9850c 100644
--- a/tests/carservice_unit_test/src/com/android/car/vms/VmsBrokerServiceTest.java
+++ b/tests/carservice_unit_test/src/com/android/car/vms/VmsBrokerServiceTest.java
@@ -20,9 +20,11 @@
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertThrows;
 
@@ -42,6 +44,7 @@
 import android.os.Binder;
 import android.os.IBinder;
 import android.os.RemoteException;
+import android.os.SharedMemory;
 import android.os.UserHandle;
 
 import androidx.test.filters.SmallTest;
@@ -49,11 +52,13 @@
 import com.android.car.stats.CarStatsService;
 import com.android.car.stats.VmsClientLogger;
 
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import java.util.Arrays;
@@ -89,6 +94,7 @@
     private static final VmsLayer LAYER3 = new VmsLayer(3, 1, 1);
 
     private static final byte[] PAYLOAD = {1, 2, 3, 4, 5, 6, 7, 8};
+    private static final int LARGE_PACKET_SIZE = 54321;
 
     @Mock
     private Context mContext;
@@ -114,9 +120,9 @@
     @Mock
     private VmsClientLogger mNoSubscribersLog;
 
-
     private final IBinder mClientToken1 = new Binder();
     private final IBinder mClientToken2 = new Binder();
+    private SharedMemory mLargePacket;
 
     private VmsNewBrokerService mBrokerService;
     private int mCallingAppUid;
@@ -139,6 +145,18 @@
         mCallingAppUid = TEST_APP_UID1;
     }
 
+    // Used by PublishLargePacket tests
+    private void setupLargePacket() throws Exception {
+        mLargePacket = Mockito.spy(SharedMemory.create("VmsBrokerServiceTest", LARGE_PACKET_SIZE));
+    }
+
+    @After
+    public void tearDown() {
+        if (mLargePacket != null) {
+            mLargePacket.close();
+        }
+    }
+
     @Test
     public void testRegister() {
         VmsRegistrationInfo registrationInfo =
@@ -2787,6 +2805,410 @@
         verifyPacketReceived(mClientCallback2, providerId, LAYER1, PAYLOAD);
     }
 
+    @Test
+    public void testPublishLargePacket_UnknownClient() throws Exception {
+        setupLargePacket();
+        registerClient(mClientToken1, mClientCallback1);
+        int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+
+        assertThrows(
+                IllegalStateException.class,
+                () -> mBrokerService.publishLargePacket(
+                        new Binder(), providerId, LAYER1, mLargePacket));
+
+        verify(mLargePacket, atLeastOnce()).getSize();
+        verify(mLargePacket).close();
+        verifyNoMoreInteractions(mLargePacket);
+    }
+
+    @Test
+    public void testPublishLargePacket_UnknownOffering() throws Exception {
+        setupLargePacket();
+        registerClient(mClientToken1, mClientCallback1);
+        int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+
+        assertThrows(
+                IllegalArgumentException.class,
+                () -> mBrokerService.publishLargePacket(
+                        mClientToken1, providerId, LAYER1, mLargePacket));
+
+        verify(mLargePacket, atLeastOnce()).getSize();
+        verify(mLargePacket).close();
+        verifyNoMoreInteractions(mLargePacket);
+    }
+
+    @Test
+    public void testPublishLargePacket_UnknownOffering_LegacyClient() throws Exception {
+        setupLargePacket();
+        mBrokerService.registerClient(mClientToken1, mClientCallback1, true);
+
+        mBrokerService.setSubscriptions(mClientToken1, asList(
+                new VmsAssociatedLayer(LAYER1, emptySet())
+        ));
+        mBrokerService.publishLargePacket(mClientToken1, 12345, LAYER1, mLargePacket);
+
+        verify(mClientLog1).logPacketSent(LAYER1, LARGE_PACKET_SIZE);
+        verify(mClientLog1).logPacketReceived(LAYER1, LARGE_PACKET_SIZE);
+        verifyLargePacketReceived(mClientCallback1, 12345, LAYER1, mLargePacket);
+
+        verify(mLargePacket, atLeastOnce()).getSize();
+        verify(mLargePacket).close();
+        verifyNoMoreInteractions(mLargePacket);
+    }
+
+    @Test
+    public void testPublishLargePacket_NoSubscribers() throws Exception {
+        setupLargePacket();
+        registerClient(mClientToken1, mClientCallback1);
+        int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+        mBrokerService.setProviderOfferings(mClientToken1, providerId, asList(
+                new VmsLayerDependency(LAYER1)
+        ));
+        registerClient(mClientToken2, mClientCallback2);
+
+        mBrokerService.publishLargePacket(mClientToken1, providerId, LAYER1, mLargePacket);
+
+        verify(mClientLog1).logPacketSent(LAYER1, LARGE_PACKET_SIZE);
+        verify(mNoSubscribersLog).logPacketDropped(LAYER1, LARGE_PACKET_SIZE);
+        verifyNoPacketsReceived(mClientCallback1, providerId, LAYER1);
+        verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+
+        verify(mLargePacket, atLeastOnce()).getSize();
+        verify(mLargePacket).close();
+        verifyNoMoreInteractions(mLargePacket);
+    }
+
+    @Test
+    public void testPublishLargePacket_MonitorSubscriber_Enabled() throws Exception {
+        setupLargePacket();
+        registerClient(mClientToken1, mClientCallback1);
+        int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+
+        mBrokerService.setProviderOfferings(mClientToken1, providerId, asList(
+                new VmsLayerDependency(LAYER1)
+        ));
+        registerClient(mClientToken2, mClientCallback2);
+
+        mBrokerService.setMonitoringEnabled(mClientToken1, true);
+        mBrokerService.publishLargePacket(mClientToken1, providerId, LAYER1, mLargePacket);
+
+        verify(mClientLog1).logPacketSent(LAYER1, LARGE_PACKET_SIZE);
+        verify(mClientLog1).logPacketReceived(LAYER1, LARGE_PACKET_SIZE);
+        verifyLargePacketReceived(mClientCallback1, providerId, LAYER1, mLargePacket);
+        verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+
+        verify(mLargePacket, atLeastOnce()).getSize();
+        verify(mLargePacket).close();
+        verifyNoMoreInteractions(mLargePacket);
+    }
+
+    @Test
+    public void testPublishLargePacket_MonitorSubscriber_EnabledAndDisabled() throws Exception {
+        setupLargePacket();
+        registerClient(mClientToken1, mClientCallback1);
+        int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+
+        mBrokerService.setProviderOfferings(mClientToken1, providerId, asList(
+                new VmsLayerDependency(LAYER1)
+        ));
+        registerClient(mClientToken2, mClientCallback2);
+
+        mBrokerService.setMonitoringEnabled(mClientToken1, true);
+        mBrokerService.setMonitoringEnabled(mClientToken1, false);
+        mBrokerService.publishLargePacket(mClientToken1, providerId, LAYER1, mLargePacket);
+
+        verify(mClientLog1).logPacketSent(LAYER1, LARGE_PACKET_SIZE);
+        verify(mNoSubscribersLog).logPacketDropped(LAYER1, LARGE_PACKET_SIZE);
+        verifyNoPacketsReceived(mClientCallback1, providerId, LAYER1);
+        verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+
+        verify(mLargePacket, atLeastOnce()).getSize();
+        verify(mLargePacket).close();
+        verifyNoMoreInteractions(mLargePacket);
+    }
+
+    @Test
+    public void testPublishLargePacket_LayerSubscriber() throws Exception {
+        setupLargePacket();
+        registerClient(mClientToken1, mClientCallback1);
+        int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+
+        mBrokerService.setProviderOfferings(mClientToken1, providerId, asList(
+                new VmsLayerDependency(LAYER1)
+        ));
+        registerClient(mClientToken2, mClientCallback2);
+
+        mBrokerService.setSubscriptions(mClientToken1, asList(
+                new VmsAssociatedLayer(LAYER1, emptySet())
+        ));
+        mBrokerService.publishLargePacket(mClientToken1, providerId, LAYER1, mLargePacket);
+
+        verify(mClientLog1).logPacketSent(LAYER1, LARGE_PACKET_SIZE);
+        verify(mClientLog1).logPacketReceived(LAYER1, LARGE_PACKET_SIZE);
+        verifyLargePacketReceived(mClientCallback1, providerId, LAYER1, mLargePacket);
+        verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+
+        verify(mLargePacket, atLeastOnce()).getSize();
+        verify(mLargePacket).close();
+        verifyNoMoreInteractions(mLargePacket);
+    }
+
+    @Test
+    public void testPublishLargePacket_LayerSubscriber_Unsubscribe() throws Exception {
+        setupLargePacket();
+        registerClient(mClientToken1, mClientCallback1);
+        int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+
+        mBrokerService.setProviderOfferings(mClientToken1, providerId, asList(
+                new VmsLayerDependency(LAYER1)
+        ));
+        registerClient(mClientToken2, mClientCallback2);
+
+        mBrokerService.setSubscriptions(mClientToken1, asList(
+                new VmsAssociatedLayer(LAYER1, emptySet())
+        ));
+        mBrokerService.setSubscriptions(mClientToken1, asList());
+        mBrokerService.publishLargePacket(mClientToken1, providerId, LAYER1, mLargePacket);
+
+        verify(mClientLog1).logPacketSent(LAYER1, LARGE_PACKET_SIZE);
+        verify(mNoSubscribersLog).logPacketDropped(LAYER1, LARGE_PACKET_SIZE);
+        verifyNoPacketsReceived(mClientCallback1, providerId, LAYER1);
+        verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+
+        verify(mLargePacket, atLeastOnce()).getSize();
+        verify(mLargePacket).close();
+        verifyNoMoreInteractions(mLargePacket);
+    }
+
+    @Test
+    public void testPublishLargePacket_LayerSubscriber_DifferentLayer() throws Exception {
+        setupLargePacket();
+        registerClient(mClientToken1, mClientCallback1);
+        int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+
+        mBrokerService.setProviderOfferings(mClientToken1, providerId, asList(
+                new VmsLayerDependency(LAYER1)
+        ));
+        registerClient(mClientToken2, mClientCallback2);
+
+        mBrokerService.setSubscriptions(mClientToken1, asList(
+                new VmsAssociatedLayer(LAYER2, emptySet())
+        ));
+        mBrokerService.publishLargePacket(mClientToken1, providerId, LAYER1, mLargePacket);
+
+        verify(mClientLog1).logPacketSent(LAYER1, LARGE_PACKET_SIZE);
+        verify(mNoSubscribersLog).logPacketDropped(LAYER1, LARGE_PACKET_SIZE);
+        verifyNoPacketsReceived(mClientCallback1, providerId, LAYER1);
+        verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+
+        verify(mLargePacket, atLeastOnce()).getSize();
+        verify(mLargePacket).close();
+        verifyNoMoreInteractions(mLargePacket);
+    }
+
+    @Test
+    public void testPublishLargePacket_MultipleLayerSubscribers() throws Exception {
+        setupLargePacket();
+        registerClient(mClientToken1, mClientCallback1);
+        int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+
+        mBrokerService.setProviderOfferings(mClientToken1, providerId, asList(
+                new VmsLayerDependency(LAYER1)
+        ));
+        registerClient(mClientToken2, mClientCallback2);
+
+        mBrokerService.setSubscriptions(mClientToken1, asList(
+                new VmsAssociatedLayer(LAYER1, emptySet())
+        ));
+        mBrokerService.setSubscriptions(mClientToken2, asList(
+                new VmsAssociatedLayer(LAYER1, emptySet())
+        ));
+        mBrokerService.publishLargePacket(mClientToken1, providerId, LAYER1, mLargePacket);
+
+        verify(mClientLog1).logPacketSent(LAYER1, LARGE_PACKET_SIZE);
+        verify(mClientLog1, times(2)).logPacketReceived(LAYER1, LARGE_PACKET_SIZE);
+        verifyLargePacketReceived(mClientCallback1, providerId, LAYER1, mLargePacket);
+        verifyLargePacketReceived(mClientCallback2, providerId, LAYER1, mLargePacket);
+
+        verify(mLargePacket, atLeastOnce()).getSize();
+        verify(mLargePacket).close();
+        verifyNoMoreInteractions(mLargePacket);
+    }
+
+    @Test
+    public void testPublishLargePacket_MultipleLayerSubscribers_DifferentProcesses()
+            throws Exception {
+        setupLargePacket();
+        registerClient(mClientToken1, mClientCallback1);
+        int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+
+        mBrokerService.setProviderOfferings(mClientToken1, providerId, asList(
+                new VmsLayerDependency(LAYER1)
+        ));
+        mCallingAppUid = TEST_APP_UID2;
+        registerClient(mClientToken2, mClientCallback2);
+
+        mBrokerService.setSubscriptions(mClientToken1, asList(
+                new VmsAssociatedLayer(LAYER1, emptySet())
+        ));
+        mBrokerService.setSubscriptions(mClientToken2, asList(
+                new VmsAssociatedLayer(LAYER1, emptySet())
+        ));
+        mBrokerService.publishLargePacket(mClientToken1, providerId, LAYER1, mLargePacket);
+
+        verify(mClientLog1).logPacketSent(LAYER1, LARGE_PACKET_SIZE);
+        verify(mClientLog1).logPacketReceived(LAYER1, LARGE_PACKET_SIZE);
+        verify(mClientLog2).logPacketReceived(LAYER1, LARGE_PACKET_SIZE);
+        verifyLargePacketReceived(mClientCallback1, providerId, LAYER1, mLargePacket);
+        verifyLargePacketReceived(mClientCallback2, providerId, LAYER1, mLargePacket);
+
+        verify(mLargePacket, atLeastOnce()).getSize();
+        verify(mLargePacket).close();
+        verifyNoMoreInteractions(mLargePacket);
+    }
+
+    @Test
+    public void testPublishLargePacket_LayerAndProviderSubscriber() throws Exception {
+        setupLargePacket();
+        registerClient(mClientToken1, mClientCallback1);
+        int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+
+        mBrokerService.setProviderOfferings(mClientToken1, providerId, asList(
+                new VmsLayerDependency(LAYER1)
+        ));
+        registerClient(mClientToken2, mClientCallback2);
+
+        mBrokerService.setSubscriptions(mClientToken1, asList(
+                new VmsAssociatedLayer(LAYER1, asSet(providerId))
+        ));
+        mBrokerService.publishLargePacket(mClientToken1, providerId, LAYER1, mLargePacket);
+
+        verify(mClientLog1).logPacketSent(LAYER1, LARGE_PACKET_SIZE);
+        verify(mClientLog1).logPacketReceived(LAYER1, LARGE_PACKET_SIZE);
+        verifyLargePacketReceived(mClientCallback1, providerId, LAYER1, mLargePacket);
+        verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+
+        verify(mLargePacket, atLeastOnce()).getSize();
+        verify(mLargePacket).close();
+        verifyNoMoreInteractions(mLargePacket);
+    }
+
+    @Test
+    public void testPublishLargePacket_LayerAndProviderSubscriber_Unsubscribe() throws Exception {
+        setupLargePacket();
+        registerClient(mClientToken1, mClientCallback1);
+        int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+
+        mBrokerService.setProviderOfferings(mClientToken1, providerId, asList(
+                new VmsLayerDependency(LAYER1)
+        ));
+        registerClient(mClientToken2, mClientCallback2);
+
+        mBrokerService.setSubscriptions(mClientToken1, asList(
+                new VmsAssociatedLayer(LAYER1, asSet(providerId))
+        ));
+        mBrokerService.setSubscriptions(mClientToken1, asList());
+        mBrokerService.publishLargePacket(mClientToken1, providerId, LAYER1, mLargePacket);
+
+        verify(mClientLog1).logPacketSent(LAYER1, LARGE_PACKET_SIZE);
+        verify(mNoSubscribersLog).logPacketDropped(LAYER1, LARGE_PACKET_SIZE);
+        verifyNoPacketsReceived(mClientCallback1, providerId, LAYER1);
+        verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+
+        verify(mLargePacket, atLeastOnce()).getSize();
+        verify(mLargePacket).close();
+        verifyNoMoreInteractions(mLargePacket);
+    }
+
+    @Test
+    public void testPublishLargePacket_LayerAndProviderSubscriber_DifferentProvider()
+            throws Exception {
+        setupLargePacket();
+        registerClient(mClientToken1, mClientCallback1);
+        int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+        int providerId2 = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO2);
+
+        mBrokerService.setProviderOfferings(mClientToken1, providerId, asList(
+                new VmsLayerDependency(LAYER1)
+        ));
+        registerClient(mClientToken2, mClientCallback2);
+
+        mBrokerService.setSubscriptions(mClientToken1, asList(
+                new VmsAssociatedLayer(LAYER1, asSet(providerId2))
+        ));
+        mBrokerService.publishLargePacket(mClientToken1, providerId, LAYER1, mLargePacket);
+
+        verify(mClientLog1).logPacketSent(LAYER1, LARGE_PACKET_SIZE);
+        verify(mNoSubscribersLog).logPacketDropped(LAYER1, LARGE_PACKET_SIZE);
+        verifyNoPacketsReceived(mClientCallback1, providerId, LAYER1);
+        verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+
+        verify(mLargePacket, atLeastOnce()).getSize();
+        verify(mLargePacket).close();
+        verifyNoMoreInteractions(mLargePacket);
+    }
+
+    @Test
+    public void testPublishLargePacket_MultipleLayerAndProviderSubscribers() throws Exception {
+        setupLargePacket();
+        registerClient(mClientToken1, mClientCallback1);
+        int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+
+        mBrokerService.setProviderOfferings(mClientToken1, providerId, asList(
+                new VmsLayerDependency(LAYER1)
+        ));
+        registerClient(mClientToken2, mClientCallback2);
+
+        mBrokerService.setSubscriptions(mClientToken1, asList(
+                new VmsAssociatedLayer(LAYER1, asSet(providerId))
+        ));
+        mBrokerService.setSubscriptions(mClientToken2, asList(
+                new VmsAssociatedLayer(LAYER1, asSet(providerId))
+        ));
+        mBrokerService.publishLargePacket(mClientToken1, providerId, LAYER1, mLargePacket);
+
+        verify(mClientLog1).logPacketSent(LAYER1, LARGE_PACKET_SIZE);
+        verify(mClientLog1, times(2)).logPacketReceived(LAYER1, LARGE_PACKET_SIZE);
+        verifyLargePacketReceived(mClientCallback1, providerId, LAYER1, mLargePacket);
+        verifyLargePacketReceived(mClientCallback2, providerId, LAYER1, mLargePacket);
+
+        verify(mLargePacket, atLeastOnce()).getSize();
+        verify(mLargePacket).close();
+        verifyNoMoreInteractions(mLargePacket);
+    }
+
+    @Test
+    public void testPublishLargePacket_MultipleLayerAndProviderSubscribers_DifferentProcesses()
+            throws Exception {
+        setupLargePacket();
+        registerClient(mClientToken1, mClientCallback1);
+        int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+
+        mBrokerService.setProviderOfferings(mClientToken1, providerId, asList(
+                new VmsLayerDependency(LAYER1)
+        ));
+        mCallingAppUid = TEST_APP_UID2;
+        registerClient(mClientToken2, mClientCallback2);
+
+        mBrokerService.setSubscriptions(mClientToken1, asList(
+                new VmsAssociatedLayer(LAYER1, asSet(providerId))
+        ));
+        mBrokerService.setSubscriptions(mClientToken2, asList(
+                new VmsAssociatedLayer(LAYER1, asSet(providerId))
+        ));
+        mBrokerService.publishLargePacket(mClientToken1, providerId, LAYER1, mLargePacket);
+
+        verify(mClientLog1).logPacketSent(LAYER1, LARGE_PACKET_SIZE);
+        verify(mClientLog1).logPacketReceived(LAYER1, LARGE_PACKET_SIZE);
+        verify(mClientLog2).logPacketReceived(LAYER1, LARGE_PACKET_SIZE);
+        verifyLargePacketReceived(mClientCallback1, providerId, LAYER1, mLargePacket);
+        verifyLargePacketReceived(mClientCallback2, providerId, LAYER1, mLargePacket);
+
+        verify(mLargePacket, atLeastOnce()).getSize();
+        verify(mLargePacket).close();
+        verifyNoMoreInteractions(mLargePacket);
+    }
+
     private void registerClient(IBinder token, IVmsClientCallback callback) {
         mBrokerService.registerClient(token, callback, false);
     }
@@ -2822,6 +3244,7 @@
             IVmsClientCallback callback,
             int providerId, VmsLayer layer) throws RemoteException {
         verify(callback, never()).onPacketReceived(eq(providerId), eq(layer), any());
+        verify(callback, never()).onLargePacketReceived(eq(providerId), eq(layer), any());
     }
 
     private static void verifyPacketReceived(
@@ -2829,6 +3252,11 @@
             int providerId, VmsLayer layer, byte[] payload) throws RemoteException {
         verify(callback).onPacketReceived(providerId, layer, payload);
     }
+    private static void verifyLargePacketReceived(
+            IVmsClientCallback callback,
+            int providerId, VmsLayer layer, SharedMemory packet) throws RemoteException {
+        verify(callback).onLargePacketReceived(providerId, layer, packet);
+    }
 
     private static <T> Set<T> asSet(T... values) {
         return new HashSet<T>(Arrays.asList(values));