Use SharedMemory for transporting large packets.
Bug: 151326583
Test: atest CarServiceTest CarServiceUnitTest
Change-Id: Ifae8581d3d982697e6a23e203070c73906d80c18
diff --git a/car-lib/src/android/car/vms/IVmsBrokerService.aidl b/car-lib/src/android/car/vms/IVmsBrokerService.aidl
index cb70323..28b50a6 100644
--- a/car-lib/src/android/car/vms/IVmsBrokerService.aidl
+++ b/car-lib/src/android/car/vms/IVmsBrokerService.aidl
@@ -22,6 +22,7 @@
import android.car.vms.VmsLayerDependency;
import android.car.vms.VmsProviderInfo;
import android.car.vms.VmsRegistrationInfo;
+import android.os.SharedMemory;
/**
* Hidden API for communicating with the Vehicle Map Service message broker.
@@ -68,4 +69,10 @@
int providerId,
in VmsLayer layer,
in byte[] packet) = 7;
+
+ void publishLargePacket(
+ in IBinder token,
+ int providerId,
+ in VmsLayer layer,
+ in SharedMemory packet) = 8;
}
diff --git a/car-lib/src/android/car/vms/IVmsClientCallback.aidl b/car-lib/src/android/car/vms/IVmsClientCallback.aidl
index 5c8886d..c9b2f96 100644
--- a/car-lib/src/android/car/vms/IVmsClientCallback.aidl
+++ b/car-lib/src/android/car/vms/IVmsClientCallback.aidl
@@ -19,6 +19,7 @@
import android.car.vms.VmsAvailableLayers;
import android.car.vms.VmsLayer;
import android.car.vms.VmsSubscriptionState;
+import android.os.SharedMemory;
/**
* Hidden API for sending notifications to Vehicle Map Service clients.
@@ -36,4 +37,9 @@
int providerId,
in VmsLayer layer,
in byte[] packet) = 2;
+
+ void onLargePacketReceived(
+ int providerId,
+ in VmsLayer layer,
+ in SharedMemory packet) = 3;
}
diff --git a/car-lib/src/android/car/vms/VmsClient.java b/car-lib/src/android/car/vms/VmsClient.java
index b580ced..394ba67 100644
--- a/car-lib/src/android/car/vms/VmsClient.java
+++ b/car-lib/src/android/car/vms/VmsClient.java
@@ -16,6 +16,8 @@
package android.car.vms;
+import static android.system.OsConstants.PROT_READ;
+
import android.annotation.NonNull;
import android.annotation.Nullable;
import android.annotation.RequiresPermission;
@@ -25,11 +27,14 @@
import android.os.Binder;
import android.os.IBinder;
import android.os.RemoteException;
+import android.os.SharedMemory;
+import android.system.ErrnoException;
import android.util.Log;
import com.android.internal.annotations.GuardedBy;
import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Objects;
@@ -54,6 +59,7 @@
new VmsAvailableLayers(Collections.emptySet(), 0);
private static final VmsSubscriptionState DEFAULT_SUBSCRIPTIONS =
new VmsSubscriptionState(0, Collections.emptySet(), Collections.emptySet());
+ private static final int LARGE_PACKET_THRESHOLD = 16 * 1024; // 16 KB
private final IVmsBrokerService mService;
private final Executor mExecutor;
@@ -236,11 +242,20 @@
*/
@RequiresPermission(Car.PERMISSION_VMS_PUBLISHER)
public void publishPacket(int providerId, @NonNull VmsLayer layer, @NonNull byte[] packet) {
- if (DBG) Log.d(TAG, "Publishing packet as " + providerId);
Objects.requireNonNull(layer, "layer cannot be null");
Objects.requireNonNull(packet, "packet cannot be null");
+ if (DBG) {
+ Log.d(TAG, "Publishing packet as " + providerId + " (" + packet.length + " bytes)");
+ }
try {
- mService.publishPacket(mClientToken, providerId, layer, packet);
+ if (packet.length < LARGE_PACKET_THRESHOLD) {
+ mService.publishPacket(mClientToken, providerId, layer, packet);
+ } else {
+ try (SharedMemory largePacket = packetToSharedMemory(packet)) {
+ mService.publishLargePacket(mClientToken, providerId, layer,
+ largePacket);
+ }
+ }
} catch (RemoteException e) {
Log.e(TAG, "While publishing packet as " + providerId);
mExceptionHandler.accept(e);
@@ -312,11 +327,27 @@
@Override
public void onPacketReceived(int providerId, VmsLayer layer, byte[] packet) {
- if (DBG) Log.d(TAG, "Received packet from " + providerId + " for: " + layer);
+ if (DBG) {
+ Log.d(TAG, "Received packet from " + providerId + " for: " + layer
+ + " (" + packet.length + " bytes)");
+ }
executeCallback((client, callback) ->
callback.onPacketReceived(providerId, layer, packet));
}
+ @Override
+ public void onLargePacketReceived(int providerId, VmsLayer layer, SharedMemory packet) {
+ try (SharedMemory largePacket = packet) {
+ if (DBG) {
+ Log.d(TAG, "Received large packet from " + providerId + " for: " + layer
+ + " (" + largePacket.getSize() + " bytes)");
+ }
+ byte[] packetData = sharedMemoryToPacket(largePacket);
+ executeCallback((client, callback) ->
+ callback.onPacketReceived(providerId, layer, packetData));
+ }
+ }
+
private void executeCallback(BiConsumer<VmsClient, VmsClientCallback> callbackOperation) {
final VmsClient client = mClient.get();
if (client == null) {
@@ -332,4 +363,51 @@
}
}
}
+
+ private static SharedMemory packetToSharedMemory(byte[] packet) {
+ SharedMemory shm;
+ try {
+ shm = SharedMemory.create("VmsClient", packet.length);
+ } catch (ErrnoException e) {
+ throw new IllegalStateException("Failed to allocate shared memory", e);
+ }
+
+ ByteBuffer buffer = null;
+ try {
+ buffer = shm.mapReadWrite();
+ buffer.put(packet);
+ } catch (ErrnoException e) {
+ shm.close();
+ throw new IllegalStateException("Failed to create write buffer", e);
+ } finally {
+ if (buffer != null) {
+ SharedMemory.unmap(buffer);
+ }
+ }
+
+ if (!shm.setProtect(PROT_READ)) {
+ shm.close();
+ throw new SecurityException("Failed to set read-only protection on shared memory");
+ }
+
+ return shm;
+ }
+
+ private static byte[] sharedMemoryToPacket(SharedMemory shm) {
+ ByteBuffer buffer;
+ try {
+ buffer = shm.mapReadOnly();
+ } catch (ErrnoException e) {
+ throw new IllegalStateException("Failed to create read buffer", e);
+ }
+
+ byte[] packet;
+ try {
+ packet = new byte[buffer.capacity()];
+ buffer.get(packet);
+ } finally {
+ SharedMemory.unmap(buffer);
+ }
+ return packet;
+ }
}
diff --git a/service/src/com/android/car/vms/VmsNewBrokerService.java b/service/src/com/android/car/vms/VmsNewBrokerService.java
index 8cbc33d..d956bd2 100644
--- a/service/src/com/android/car/vms/VmsNewBrokerService.java
+++ b/service/src/com/android/car/vms/VmsNewBrokerService.java
@@ -35,6 +35,7 @@
import android.os.Binder;
import android.os.IBinder;
import android.os.RemoteException;
+import android.os.SharedMemory;
import android.util.ArrayMap;
import android.util.ArraySet;
import android.util.Log;
@@ -46,6 +47,7 @@
import com.android.car.stats.VmsClientLogger;
import com.android.internal.annotations.GuardedBy;
import com.android.internal.annotations.VisibleForTesting;
+import com.android.internal.util.FunctionalUtils.ThrowingConsumer;
import java.io.PrintWriter;
import java.util.ArrayList;
@@ -195,13 +197,28 @@
@Override
public void publishPacket(IBinder clientToken, int providerId, VmsLayer layer, byte[] packet) {
assertVmsPublisherPermission(mContext);
+ deliverToSubscribers(clientToken, providerId, layer, packet.length,
+ callback -> callback.onPacketReceived(providerId, layer, packet));
+ }
+
+ @Override
+ public void publishLargePacket(IBinder clientToken, int providerId, VmsLayer layer,
+ SharedMemory packet) {
+ try (SharedMemory largePacket = packet) {
+ assertVmsPublisherPermission(mContext);
+ deliverToSubscribers(clientToken, providerId, layer, packet.getSize(),
+ callback -> callback.onLargePacketReceived(providerId, layer, largePacket));
+ }
+ }
+
+ private void deliverToSubscribers(IBinder clientToken, int providerId, VmsLayer layer,
+ int packetLength, ThrowingConsumer<IVmsClientCallback> callbackConsumer) {
VmsClientInfo client = getClient(clientToken);
if (!client.hasOffering(providerId, layer) && !client.isLegacyClient()) {
throw new IllegalArgumentException("Client does not offer " + layer + " as "
+ providerId);
}
- int packetLength = packet != null ? packet.length : 0;
mStatsService.getVmsClientLogger(client.getUid())
.logPacketSent(layer, packetLength);
@@ -222,14 +239,14 @@
for (VmsClientInfo subscriber : subscribers) {
try {
- subscriber.getCallback().onPacketReceived(providerId, layer, packet);
+ callbackConsumer.accept(subscriber.getCallback());
mStatsService.getVmsClientLogger(subscriber.getUid())
.logPacketReceived(layer, packetLength);
- } catch (RemoteException ex) {
+ } catch (RuntimeException e) {
mStatsService.getVmsClientLogger(subscriber.getUid())
.logPacketDropped(layer, packetLength);
Log.e(TAG, String.format("Unable to publish to listener: %s",
- subscriber.getPackageName()));
+ subscriber.getPackageName()), e);
}
}
}
diff --git a/tests/carservice_test/src/com/android/car/vms/VmsClientTest.java b/tests/carservice_test/src/com/android/car/vms/VmsClientTest.java
index 235e438..257d1ff 100644
--- a/tests/carservice_test/src/com/android/car/vms/VmsClientTest.java
+++ b/tests/carservice_test/src/com/android/car/vms/VmsClientTest.java
@@ -79,6 +79,7 @@
private static final VmsLayer LAYER3 = new VmsLayer(3, 1, 1);
private static final byte[] PAYLOAD = {1, 2, 3, 4, 5, 6, 7, 8};
+ private static final byte[] LARGE_PAYLOAD = new byte[16 * 1024]; // 16KB
@Rule
public MockitoRule mMockitoRule = MockitoJUnit.rule();
@@ -96,6 +97,7 @@
@Before
public void setUpTest() {
mClientManager = (VmsClientManager) getCar().getCarManager(Car.VEHICLE_MAP_SERVICE);
+ LARGE_PAYLOAD[0] = 123;
}
@Test
@@ -2378,6 +2380,183 @@
verifyPacketReceived(mClientCallback2, providerId, LAYER1, PAYLOAD);
}
+ @Test
+ public void testPublishPacket_Large_UnknownOffering() {
+ VmsClient client = connectVmsClient(mClientCallback1);
+ int providerId = client.registerProvider(PROVIDER_DESC1);
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> client.publishPacket(providerId, LAYER1, LARGE_PAYLOAD));
+ }
+
+ @Test
+ public void testPublishPacket_Large_NoSubscribers() {
+ VmsClient client = connectVmsClient(mClientCallback1);
+ int providerId = client.registerProvider(PROVIDER_DESC1);
+ client.setProviderOfferings(providerId, asSet(
+ new VmsLayerDependency(LAYER1)
+ ));
+ connectVmsClient(mClientCallback2);
+
+ client.publishPacket(providerId, LAYER1, LARGE_PAYLOAD);
+
+ awaitTaskCompletion();
+ verifyNoPacketsReceived(mClientCallback1, providerId, LAYER1);
+ verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+ }
+
+ @Test
+ public void testPublishPacket_Large_MonitorSubscriber_Enabled() {
+ VmsClient client = connectVmsClient(mClientCallback1);
+ int providerId = client.registerProvider(PROVIDER_DESC1);
+ client.setProviderOfferings(providerId, asSet(
+ new VmsLayerDependency(LAYER1)
+ ));
+ connectVmsClient(mClientCallback2);
+
+ client.setMonitoringEnabled(true);
+ client.publishPacket(providerId, LAYER1, LARGE_PAYLOAD);
+
+ awaitTaskCompletion();
+ verifyPacketReceived(mClientCallback1, providerId, LAYER1, LARGE_PAYLOAD);
+ verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+ }
+
+ @Test
+ public void testPublishPacket_Large_MonitorSubscriber_EnabledAndDisabled() {
+ VmsClient client = connectVmsClient(mClientCallback1);
+ int providerId = client.registerProvider(PROVIDER_DESC1);
+ client.setProviderOfferings(providerId, asSet(
+ new VmsLayerDependency(LAYER1)
+ ));
+ connectVmsClient(mClientCallback2);
+
+ client.setMonitoringEnabled(true);
+ client.setMonitoringEnabled(false);
+ client.publishPacket(providerId, LAYER1, LARGE_PAYLOAD);
+
+ awaitTaskCompletion();
+ verifyNoPacketsReceived(mClientCallback1, providerId, LAYER1);
+ verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+ }
+
+ @Test
+ public void testPublishPacket_Large_LayerSubscriber() {
+ VmsClient client = connectVmsClient(mClientCallback1);
+ int providerId = client.registerProvider(PROVIDER_DESC1);
+ client.setProviderOfferings(providerId, asSet(
+ new VmsLayerDependency(LAYER1)
+ ));
+ connectVmsClient(mClientCallback2);
+
+ client.setSubscriptions(asSet(
+ new VmsAssociatedLayer(LAYER1, emptySet())
+ ));
+ client.publishPacket(providerId, LAYER1, LARGE_PAYLOAD);
+
+ awaitTaskCompletion();
+ verifyPacketReceived(mClientCallback1, providerId, LAYER1, LARGE_PAYLOAD);
+ verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+ }
+
+ @Test
+ public void testPublishPacket_Large_LayerSubscriber_Unsubscribe() {
+ VmsClient client = connectVmsClient(mClientCallback1);
+ int providerId = client.registerProvider(PROVIDER_DESC1);
+ client.setProviderOfferings(providerId, asSet(
+ new VmsLayerDependency(LAYER1)
+ ));
+ connectVmsClient(mClientCallback2);
+
+ client.setSubscriptions(asSet(
+ new VmsAssociatedLayer(LAYER1, emptySet())
+ ));
+ client.setSubscriptions(emptySet());
+ client.publishPacket(providerId, LAYER1, LARGE_PAYLOAD);
+
+ awaitTaskCompletion();
+ verifyNoPacketsReceived(mClientCallback1, providerId, LAYER1);
+ verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+ }
+
+ @Test
+ public void testPublishPacket_Large_LayerSubscriber_DifferentLayer() {
+ VmsClient client = connectVmsClient(mClientCallback1);
+ int providerId = client.registerProvider(PROVIDER_DESC1);
+ client.setProviderOfferings(providerId, asSet(
+ new VmsLayerDependency(LAYER1)
+ ));
+ connectVmsClient(mClientCallback2);
+
+ client.setSubscriptions(asSet(
+ new VmsAssociatedLayer(LAYER2, emptySet())
+ ));
+ client.publishPacket(providerId, LAYER1, LARGE_PAYLOAD);
+
+ awaitTaskCompletion();
+ verifyNoPacketsReceived(mClientCallback1, providerId, LAYER1);
+ verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+ }
+
+ @Test
+ public void testPublishPacket_Large_LayerAndProviderSubscriber() {
+ VmsClient client = connectVmsClient(mClientCallback1);
+ int providerId = client.registerProvider(PROVIDER_DESC1);
+ client.setProviderOfferings(providerId, asSet(
+ new VmsLayerDependency(LAYER1)
+ ));
+ connectVmsClient(mClientCallback2);
+
+ client.setSubscriptions(asSet(
+ new VmsAssociatedLayer(LAYER1, asSet(providerId))
+ ));
+ client.publishPacket(providerId, LAYER1, LARGE_PAYLOAD);
+
+ awaitTaskCompletion();
+ verifyPacketReceived(mClientCallback1, providerId, LAYER1, LARGE_PAYLOAD);
+ verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+ }
+
+ @Test
+ public void testPublishPacket_Large_LayerAndProviderSubscriber_Unsubscribe() {
+ VmsClient client = connectVmsClient(mClientCallback1);
+ int providerId = client.registerProvider(PROVIDER_DESC1);
+ client.setProviderOfferings(providerId, asSet(
+ new VmsLayerDependency(LAYER1)
+ ));
+ connectVmsClient(mClientCallback2);
+
+ client.setSubscriptions(asSet(
+ new VmsAssociatedLayer(LAYER1, asSet(providerId))
+ ));
+ client.setSubscriptions(emptySet());
+ client.publishPacket(providerId, LAYER1, LARGE_PAYLOAD);
+
+ verifyNoPacketsReceived(mClientCallback1, providerId, LAYER1);
+ verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+ }
+
+ @Test
+ public void testPublishPacket_Large_LayerAndProviderSubscriber_DifferentProvider() {
+ VmsClient client = connectVmsClient(mClientCallback1);
+ int providerId = client.registerProvider(PROVIDER_DESC1);
+ int providerId2 = client.registerProvider(PROVIDER_DESC2);
+ client.setProviderOfferings(providerId, asSet(
+ new VmsLayerDependency(LAYER1)
+ ));
+ connectVmsClient(mClientCallback2);
+
+ client.setSubscriptions(asSet(
+ new VmsAssociatedLayer(LAYER1, asSet(providerId2))
+ ));
+ client.publishPacket(providerId, LAYER1, LARGE_PAYLOAD);
+
+ awaitTaskCompletion();
+ verifyNoPacketsReceived(mClientCallback1, providerId, LAYER1);
+ verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+ }
+
private VmsClient connectVmsClient(VmsClientCallback callback) {
mClientManager.registerVmsClientCallback(mExecutor, callback);
verify(callback, timeout(CONNECT_TIMEOUT)).onClientConnected(mClientCaptor.capture());
diff --git a/tests/carservice_unit_test/src/com/android/car/vms/VmsBrokerServiceTest.java b/tests/carservice_unit_test/src/com/android/car/vms/VmsBrokerServiceTest.java
index ffc3ac7..db9850c 100644
--- a/tests/carservice_unit_test/src/com/android/car/vms/VmsBrokerServiceTest.java
+++ b/tests/carservice_unit_test/src/com/android/car/vms/VmsBrokerServiceTest.java
@@ -20,9 +20,11 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertThrows;
@@ -42,6 +44,7 @@
import android.os.Binder;
import android.os.IBinder;
import android.os.RemoteException;
+import android.os.SharedMemory;
import android.os.UserHandle;
import androidx.test.filters.SmallTest;
@@ -49,11 +52,13 @@
import com.android.car.stats.CarStatsService;
import com.android.car.stats.VmsClientLogger;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.Arrays;
@@ -89,6 +94,7 @@
private static final VmsLayer LAYER3 = new VmsLayer(3, 1, 1);
private static final byte[] PAYLOAD = {1, 2, 3, 4, 5, 6, 7, 8};
+ private static final int LARGE_PACKET_SIZE = 54321;
@Mock
private Context mContext;
@@ -114,9 +120,9 @@
@Mock
private VmsClientLogger mNoSubscribersLog;
-
private final IBinder mClientToken1 = new Binder();
private final IBinder mClientToken2 = new Binder();
+ private SharedMemory mLargePacket;
private VmsNewBrokerService mBrokerService;
private int mCallingAppUid;
@@ -139,6 +145,18 @@
mCallingAppUid = TEST_APP_UID1;
}
+ // Used by PublishLargePacket tests
+ private void setupLargePacket() throws Exception {
+ mLargePacket = Mockito.spy(SharedMemory.create("VmsBrokerServiceTest", LARGE_PACKET_SIZE));
+ }
+
+ @After
+ public void tearDown() {
+ if (mLargePacket != null) {
+ mLargePacket.close();
+ }
+ }
+
@Test
public void testRegister() {
VmsRegistrationInfo registrationInfo =
@@ -2787,6 +2805,410 @@
verifyPacketReceived(mClientCallback2, providerId, LAYER1, PAYLOAD);
}
+ @Test
+ public void testPublishLargePacket_UnknownClient() throws Exception {
+ setupLargePacket();
+ registerClient(mClientToken1, mClientCallback1);
+ int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+
+ assertThrows(
+ IllegalStateException.class,
+ () -> mBrokerService.publishLargePacket(
+ new Binder(), providerId, LAYER1, mLargePacket));
+
+ verify(mLargePacket, atLeastOnce()).getSize();
+ verify(mLargePacket).close();
+ verifyNoMoreInteractions(mLargePacket);
+ }
+
+ @Test
+ public void testPublishLargePacket_UnknownOffering() throws Exception {
+ setupLargePacket();
+ registerClient(mClientToken1, mClientCallback1);
+ int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> mBrokerService.publishLargePacket(
+ mClientToken1, providerId, LAYER1, mLargePacket));
+
+ verify(mLargePacket, atLeastOnce()).getSize();
+ verify(mLargePacket).close();
+ verifyNoMoreInteractions(mLargePacket);
+ }
+
+ @Test
+ public void testPublishLargePacket_UnknownOffering_LegacyClient() throws Exception {
+ setupLargePacket();
+ mBrokerService.registerClient(mClientToken1, mClientCallback1, true);
+
+ mBrokerService.setSubscriptions(mClientToken1, asList(
+ new VmsAssociatedLayer(LAYER1, emptySet())
+ ));
+ mBrokerService.publishLargePacket(mClientToken1, 12345, LAYER1, mLargePacket);
+
+ verify(mClientLog1).logPacketSent(LAYER1, LARGE_PACKET_SIZE);
+ verify(mClientLog1).logPacketReceived(LAYER1, LARGE_PACKET_SIZE);
+ verifyLargePacketReceived(mClientCallback1, 12345, LAYER1, mLargePacket);
+
+ verify(mLargePacket, atLeastOnce()).getSize();
+ verify(mLargePacket).close();
+ verifyNoMoreInteractions(mLargePacket);
+ }
+
+ @Test
+ public void testPublishLargePacket_NoSubscribers() throws Exception {
+ setupLargePacket();
+ registerClient(mClientToken1, mClientCallback1);
+ int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+ mBrokerService.setProviderOfferings(mClientToken1, providerId, asList(
+ new VmsLayerDependency(LAYER1)
+ ));
+ registerClient(mClientToken2, mClientCallback2);
+
+ mBrokerService.publishLargePacket(mClientToken1, providerId, LAYER1, mLargePacket);
+
+ verify(mClientLog1).logPacketSent(LAYER1, LARGE_PACKET_SIZE);
+ verify(mNoSubscribersLog).logPacketDropped(LAYER1, LARGE_PACKET_SIZE);
+ verifyNoPacketsReceived(mClientCallback1, providerId, LAYER1);
+ verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+
+ verify(mLargePacket, atLeastOnce()).getSize();
+ verify(mLargePacket).close();
+ verifyNoMoreInteractions(mLargePacket);
+ }
+
+ @Test
+ public void testPublishLargePacket_MonitorSubscriber_Enabled() throws Exception {
+ setupLargePacket();
+ registerClient(mClientToken1, mClientCallback1);
+ int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+
+ mBrokerService.setProviderOfferings(mClientToken1, providerId, asList(
+ new VmsLayerDependency(LAYER1)
+ ));
+ registerClient(mClientToken2, mClientCallback2);
+
+ mBrokerService.setMonitoringEnabled(mClientToken1, true);
+ mBrokerService.publishLargePacket(mClientToken1, providerId, LAYER1, mLargePacket);
+
+ verify(mClientLog1).logPacketSent(LAYER1, LARGE_PACKET_SIZE);
+ verify(mClientLog1).logPacketReceived(LAYER1, LARGE_PACKET_SIZE);
+ verifyLargePacketReceived(mClientCallback1, providerId, LAYER1, mLargePacket);
+ verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+
+ verify(mLargePacket, atLeastOnce()).getSize();
+ verify(mLargePacket).close();
+ verifyNoMoreInteractions(mLargePacket);
+ }
+
+ @Test
+ public void testPublishLargePacket_MonitorSubscriber_EnabledAndDisabled() throws Exception {
+ setupLargePacket();
+ registerClient(mClientToken1, mClientCallback1);
+ int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+
+ mBrokerService.setProviderOfferings(mClientToken1, providerId, asList(
+ new VmsLayerDependency(LAYER1)
+ ));
+ registerClient(mClientToken2, mClientCallback2);
+
+ mBrokerService.setMonitoringEnabled(mClientToken1, true);
+ mBrokerService.setMonitoringEnabled(mClientToken1, false);
+ mBrokerService.publishLargePacket(mClientToken1, providerId, LAYER1, mLargePacket);
+
+ verify(mClientLog1).logPacketSent(LAYER1, LARGE_PACKET_SIZE);
+ verify(mNoSubscribersLog).logPacketDropped(LAYER1, LARGE_PACKET_SIZE);
+ verifyNoPacketsReceived(mClientCallback1, providerId, LAYER1);
+ verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+
+ verify(mLargePacket, atLeastOnce()).getSize();
+ verify(mLargePacket).close();
+ verifyNoMoreInteractions(mLargePacket);
+ }
+
+ @Test
+ public void testPublishLargePacket_LayerSubscriber() throws Exception {
+ setupLargePacket();
+ registerClient(mClientToken1, mClientCallback1);
+ int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+
+ mBrokerService.setProviderOfferings(mClientToken1, providerId, asList(
+ new VmsLayerDependency(LAYER1)
+ ));
+ registerClient(mClientToken2, mClientCallback2);
+
+ mBrokerService.setSubscriptions(mClientToken1, asList(
+ new VmsAssociatedLayer(LAYER1, emptySet())
+ ));
+ mBrokerService.publishLargePacket(mClientToken1, providerId, LAYER1, mLargePacket);
+
+ verify(mClientLog1).logPacketSent(LAYER1, LARGE_PACKET_SIZE);
+ verify(mClientLog1).logPacketReceived(LAYER1, LARGE_PACKET_SIZE);
+ verifyLargePacketReceived(mClientCallback1, providerId, LAYER1, mLargePacket);
+ verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+
+ verify(mLargePacket, atLeastOnce()).getSize();
+ verify(mLargePacket).close();
+ verifyNoMoreInteractions(mLargePacket);
+ }
+
+ @Test
+ public void testPublishLargePacket_LayerSubscriber_Unsubscribe() throws Exception {
+ setupLargePacket();
+ registerClient(mClientToken1, mClientCallback1);
+ int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+
+ mBrokerService.setProviderOfferings(mClientToken1, providerId, asList(
+ new VmsLayerDependency(LAYER1)
+ ));
+ registerClient(mClientToken2, mClientCallback2);
+
+ mBrokerService.setSubscriptions(mClientToken1, asList(
+ new VmsAssociatedLayer(LAYER1, emptySet())
+ ));
+ mBrokerService.setSubscriptions(mClientToken1, asList());
+ mBrokerService.publishLargePacket(mClientToken1, providerId, LAYER1, mLargePacket);
+
+ verify(mClientLog1).logPacketSent(LAYER1, LARGE_PACKET_SIZE);
+ verify(mNoSubscribersLog).logPacketDropped(LAYER1, LARGE_PACKET_SIZE);
+ verifyNoPacketsReceived(mClientCallback1, providerId, LAYER1);
+ verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+
+ verify(mLargePacket, atLeastOnce()).getSize();
+ verify(mLargePacket).close();
+ verifyNoMoreInteractions(mLargePacket);
+ }
+
+ @Test
+ public void testPublishLargePacket_LayerSubscriber_DifferentLayer() throws Exception {
+ setupLargePacket();
+ registerClient(mClientToken1, mClientCallback1);
+ int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+
+ mBrokerService.setProviderOfferings(mClientToken1, providerId, asList(
+ new VmsLayerDependency(LAYER1)
+ ));
+ registerClient(mClientToken2, mClientCallback2);
+
+ mBrokerService.setSubscriptions(mClientToken1, asList(
+ new VmsAssociatedLayer(LAYER2, emptySet())
+ ));
+ mBrokerService.publishLargePacket(mClientToken1, providerId, LAYER1, mLargePacket);
+
+ verify(mClientLog1).logPacketSent(LAYER1, LARGE_PACKET_SIZE);
+ verify(mNoSubscribersLog).logPacketDropped(LAYER1, LARGE_PACKET_SIZE);
+ verifyNoPacketsReceived(mClientCallback1, providerId, LAYER1);
+ verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+
+ verify(mLargePacket, atLeastOnce()).getSize();
+ verify(mLargePacket).close();
+ verifyNoMoreInteractions(mLargePacket);
+ }
+
+ @Test
+ public void testPublishLargePacket_MultipleLayerSubscribers() throws Exception {
+ setupLargePacket();
+ registerClient(mClientToken1, mClientCallback1);
+ int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+
+ mBrokerService.setProviderOfferings(mClientToken1, providerId, asList(
+ new VmsLayerDependency(LAYER1)
+ ));
+ registerClient(mClientToken2, mClientCallback2);
+
+ mBrokerService.setSubscriptions(mClientToken1, asList(
+ new VmsAssociatedLayer(LAYER1, emptySet())
+ ));
+ mBrokerService.setSubscriptions(mClientToken2, asList(
+ new VmsAssociatedLayer(LAYER1, emptySet())
+ ));
+ mBrokerService.publishLargePacket(mClientToken1, providerId, LAYER1, mLargePacket);
+
+ verify(mClientLog1).logPacketSent(LAYER1, LARGE_PACKET_SIZE);
+ verify(mClientLog1, times(2)).logPacketReceived(LAYER1, LARGE_PACKET_SIZE);
+ verifyLargePacketReceived(mClientCallback1, providerId, LAYER1, mLargePacket);
+ verifyLargePacketReceived(mClientCallback2, providerId, LAYER1, mLargePacket);
+
+ verify(mLargePacket, atLeastOnce()).getSize();
+ verify(mLargePacket).close();
+ verifyNoMoreInteractions(mLargePacket);
+ }
+
+ @Test
+ public void testPublishLargePacket_MultipleLayerSubscribers_DifferentProcesses()
+ throws Exception {
+ setupLargePacket();
+ registerClient(mClientToken1, mClientCallback1);
+ int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+
+ mBrokerService.setProviderOfferings(mClientToken1, providerId, asList(
+ new VmsLayerDependency(LAYER1)
+ ));
+ mCallingAppUid = TEST_APP_UID2;
+ registerClient(mClientToken2, mClientCallback2);
+
+ mBrokerService.setSubscriptions(mClientToken1, asList(
+ new VmsAssociatedLayer(LAYER1, emptySet())
+ ));
+ mBrokerService.setSubscriptions(mClientToken2, asList(
+ new VmsAssociatedLayer(LAYER1, emptySet())
+ ));
+ mBrokerService.publishLargePacket(mClientToken1, providerId, LAYER1, mLargePacket);
+
+ verify(mClientLog1).logPacketSent(LAYER1, LARGE_PACKET_SIZE);
+ verify(mClientLog1).logPacketReceived(LAYER1, LARGE_PACKET_SIZE);
+ verify(mClientLog2).logPacketReceived(LAYER1, LARGE_PACKET_SIZE);
+ verifyLargePacketReceived(mClientCallback1, providerId, LAYER1, mLargePacket);
+ verifyLargePacketReceived(mClientCallback2, providerId, LAYER1, mLargePacket);
+
+ verify(mLargePacket, atLeastOnce()).getSize();
+ verify(mLargePacket).close();
+ verifyNoMoreInteractions(mLargePacket);
+ }
+
+ @Test
+ public void testPublishLargePacket_LayerAndProviderSubscriber() throws Exception {
+ setupLargePacket();
+ registerClient(mClientToken1, mClientCallback1);
+ int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+
+ mBrokerService.setProviderOfferings(mClientToken1, providerId, asList(
+ new VmsLayerDependency(LAYER1)
+ ));
+ registerClient(mClientToken2, mClientCallback2);
+
+ mBrokerService.setSubscriptions(mClientToken1, asList(
+ new VmsAssociatedLayer(LAYER1, asSet(providerId))
+ ));
+ mBrokerService.publishLargePacket(mClientToken1, providerId, LAYER1, mLargePacket);
+
+ verify(mClientLog1).logPacketSent(LAYER1, LARGE_PACKET_SIZE);
+ verify(mClientLog1).logPacketReceived(LAYER1, LARGE_PACKET_SIZE);
+ verifyLargePacketReceived(mClientCallback1, providerId, LAYER1, mLargePacket);
+ verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+
+ verify(mLargePacket, atLeastOnce()).getSize();
+ verify(mLargePacket).close();
+ verifyNoMoreInteractions(mLargePacket);
+ }
+
+ @Test
+ public void testPublishLargePacket_LayerAndProviderSubscriber_Unsubscribe() throws Exception {
+ setupLargePacket();
+ registerClient(mClientToken1, mClientCallback1);
+ int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+
+ mBrokerService.setProviderOfferings(mClientToken1, providerId, asList(
+ new VmsLayerDependency(LAYER1)
+ ));
+ registerClient(mClientToken2, mClientCallback2);
+
+ mBrokerService.setSubscriptions(mClientToken1, asList(
+ new VmsAssociatedLayer(LAYER1, asSet(providerId))
+ ));
+ mBrokerService.setSubscriptions(mClientToken1, asList());
+ mBrokerService.publishLargePacket(mClientToken1, providerId, LAYER1, mLargePacket);
+
+ verify(mClientLog1).logPacketSent(LAYER1, LARGE_PACKET_SIZE);
+ verify(mNoSubscribersLog).logPacketDropped(LAYER1, LARGE_PACKET_SIZE);
+ verifyNoPacketsReceived(mClientCallback1, providerId, LAYER1);
+ verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+
+ verify(mLargePacket, atLeastOnce()).getSize();
+ verify(mLargePacket).close();
+ verifyNoMoreInteractions(mLargePacket);
+ }
+
+ @Test
+ public void testPublishLargePacket_LayerAndProviderSubscriber_DifferentProvider()
+ throws Exception {
+ setupLargePacket();
+ registerClient(mClientToken1, mClientCallback1);
+ int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+ int providerId2 = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO2);
+
+ mBrokerService.setProviderOfferings(mClientToken1, providerId, asList(
+ new VmsLayerDependency(LAYER1)
+ ));
+ registerClient(mClientToken2, mClientCallback2);
+
+ mBrokerService.setSubscriptions(mClientToken1, asList(
+ new VmsAssociatedLayer(LAYER1, asSet(providerId2))
+ ));
+ mBrokerService.publishLargePacket(mClientToken1, providerId, LAYER1, mLargePacket);
+
+ verify(mClientLog1).logPacketSent(LAYER1, LARGE_PACKET_SIZE);
+ verify(mNoSubscribersLog).logPacketDropped(LAYER1, LARGE_PACKET_SIZE);
+ verifyNoPacketsReceived(mClientCallback1, providerId, LAYER1);
+ verifyNoPacketsReceived(mClientCallback2, providerId, LAYER1);
+
+ verify(mLargePacket, atLeastOnce()).getSize();
+ verify(mLargePacket).close();
+ verifyNoMoreInteractions(mLargePacket);
+ }
+
+ @Test
+ public void testPublishLargePacket_MultipleLayerAndProviderSubscribers() throws Exception {
+ setupLargePacket();
+ registerClient(mClientToken1, mClientCallback1);
+ int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+
+ mBrokerService.setProviderOfferings(mClientToken1, providerId, asList(
+ new VmsLayerDependency(LAYER1)
+ ));
+ registerClient(mClientToken2, mClientCallback2);
+
+ mBrokerService.setSubscriptions(mClientToken1, asList(
+ new VmsAssociatedLayer(LAYER1, asSet(providerId))
+ ));
+ mBrokerService.setSubscriptions(mClientToken2, asList(
+ new VmsAssociatedLayer(LAYER1, asSet(providerId))
+ ));
+ mBrokerService.publishLargePacket(mClientToken1, providerId, LAYER1, mLargePacket);
+
+ verify(mClientLog1).logPacketSent(LAYER1, LARGE_PACKET_SIZE);
+ verify(mClientLog1, times(2)).logPacketReceived(LAYER1, LARGE_PACKET_SIZE);
+ verifyLargePacketReceived(mClientCallback1, providerId, LAYER1, mLargePacket);
+ verifyLargePacketReceived(mClientCallback2, providerId, LAYER1, mLargePacket);
+
+ verify(mLargePacket, atLeastOnce()).getSize();
+ verify(mLargePacket).close();
+ verifyNoMoreInteractions(mLargePacket);
+ }
+
+ @Test
+ public void testPublishLargePacket_MultipleLayerAndProviderSubscribers_DifferentProcesses()
+ throws Exception {
+ setupLargePacket();
+ registerClient(mClientToken1, mClientCallback1);
+ int providerId = mBrokerService.registerProvider(mClientToken1, PROVIDER_INFO1);
+
+ mBrokerService.setProviderOfferings(mClientToken1, providerId, asList(
+ new VmsLayerDependency(LAYER1)
+ ));
+ mCallingAppUid = TEST_APP_UID2;
+ registerClient(mClientToken2, mClientCallback2);
+
+ mBrokerService.setSubscriptions(mClientToken1, asList(
+ new VmsAssociatedLayer(LAYER1, asSet(providerId))
+ ));
+ mBrokerService.setSubscriptions(mClientToken2, asList(
+ new VmsAssociatedLayer(LAYER1, asSet(providerId))
+ ));
+ mBrokerService.publishLargePacket(mClientToken1, providerId, LAYER1, mLargePacket);
+
+ verify(mClientLog1).logPacketSent(LAYER1, LARGE_PACKET_SIZE);
+ verify(mClientLog1).logPacketReceived(LAYER1, LARGE_PACKET_SIZE);
+ verify(mClientLog2).logPacketReceived(LAYER1, LARGE_PACKET_SIZE);
+ verifyLargePacketReceived(mClientCallback1, providerId, LAYER1, mLargePacket);
+ verifyLargePacketReceived(mClientCallback2, providerId, LAYER1, mLargePacket);
+
+ verify(mLargePacket, atLeastOnce()).getSize();
+ verify(mLargePacket).close();
+ verifyNoMoreInteractions(mLargePacket);
+ }
+
private void registerClient(IBinder token, IVmsClientCallback callback) {
mBrokerService.registerClient(token, callback, false);
}
@@ -2822,6 +3244,7 @@
IVmsClientCallback callback,
int providerId, VmsLayer layer) throws RemoteException {
verify(callback, never()).onPacketReceived(eq(providerId), eq(layer), any());
+ verify(callback, never()).onLargePacketReceived(eq(providerId), eq(layer), any());
}
private static void verifyPacketReceived(
@@ -2829,6 +3252,11 @@
int providerId, VmsLayer layer, byte[] payload) throws RemoteException {
verify(callback).onPacketReceived(providerId, layer, payload);
}
+ private static void verifyLargePacketReceived(
+ IVmsClientCallback callback,
+ int providerId, VmsLayer layer, SharedMemory packet) throws RemoteException {
+ verify(callback).onLargePacketReceived(providerId, layer, packet);
+ }
private static <T> Set<T> asSet(T... values) {
return new HashSet<T>(Arrays.asList(values));