Add removing subscribers and re-organize publishers
- Remove AbstractPublisher's callbacks and
directly implement addDataSubscriber() and other
methods in child classes. It gives more flexibility.
- Make VehiclePropertyPublisher thread-safe.
- Add more tests.
Bug: 189143814
Test: atest CarServiceUnitTest:VehiclePropertyPublisherTest
Change-Id: I061653477716badb83efbdd0f6b4fd7110997b5b
diff --git a/service/src/com/android/car/telemetry/databroker/DataSubscriber.java b/service/src/com/android/car/telemetry/databroker/DataSubscriber.java
index 0c3e9ff..4650309 100644
--- a/service/src/com/android/car/telemetry/databroker/DataSubscriber.java
+++ b/service/src/com/android/car/telemetry/databroker/DataSubscriber.java
@@ -27,6 +27,7 @@
* {@link com.android.car.telemetry.TelemetryProto.MetricsConfig} does not change during runtime.
* TODO(b/187743369): thread-safety can change if priority can be updated in runtime. Update
* javadoc once priority is concretely defined.
+ * Must be thread-safe, as #push() method may be called by multiple threads.
*/
public class DataSubscriber {
@@ -57,12 +58,12 @@
/**
* Creates a {@link ScriptExecutionTask} and pushes it to the priority queue where the task
* will be pending execution.
- * This method is thread-safe because {@link DataBroker#addTaskToQueue(ScriptExecutionTask)} is
- * thread-safe.
+ *
+ * <p>This method is thread-safe and doesn't block.
*/
public void push(Bundle data) {
- ScriptExecutionTask task = new ScriptExecutionTask(this, data,
- SystemClock.elapsedRealtime());
+ ScriptExecutionTask task = new ScriptExecutionTask(
+ this, data, SystemClock.elapsedRealtime());
mDataBroker.addTaskToQueue(task); // thread-safe
}
diff --git a/service/src/com/android/car/telemetry/publisher/AbstractPublisher.java b/service/src/com/android/car/telemetry/publisher/AbstractPublisher.java
index f58a6fa..1a2bb4a 100644
--- a/service/src/com/android/car/telemetry/publisher/AbstractPublisher.java
+++ b/service/src/com/android/car/telemetry/publisher/AbstractPublisher.java
@@ -18,10 +18,6 @@
import com.android.car.telemetry.databroker.DataSubscriber;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-
/**
* Abstract class for publishers. It is 1-1 with data source and manages sending data to
* subscribers. Publisher stops itself when there are no subscribers.
@@ -30,54 +26,29 @@
* configuration. Single publisher instance can send data as several
* {@link com.android.car.telemetry.TelemetryProto.Publisher} to subscribers.
*
- * <p>Not thread-safe.
+ * <p>Child classes must be thread-safe.
*/
public abstract class AbstractPublisher {
- private final HashSet<DataSubscriber> mDataSubscribers = new HashSet<>();
-
/**
* Adds a subscriber that listens for data produced by this publisher.
*
* @param subscriber a subscriber to receive data
* @throws IllegalArgumentException if an invalid subscriber was provided.
*/
- public void addDataSubscriber(DataSubscriber subscriber) {
- // This method can throw exceptions if data is invalid - do now swap these 2 lines.
- onDataSubscriberAdded(subscriber);
- mDataSubscribers.add(subscriber);
- }
+ public abstract void addDataSubscriber(DataSubscriber subscriber);
/**
* Removes the subscriber from the publisher. Publisher stops if necessary.
*
* @throws IllegalArgumentException if the subscriber was not found.
*/
- public void removeDataSubscriber(DataSubscriber subscriber) {
- if (!mDataSubscribers.remove(subscriber)) {
- throw new IllegalArgumentException("Failed to remove, subscriber not found");
- }
- onDataSubscribersRemoved(Collections.singletonList(subscriber));
- }
+ public abstract void removeDataSubscriber(DataSubscriber subscriber);
/**
* Removes all the subscribers from the publisher. The publisher may stop.
*/
- public void removeAllDataSubscribers() {
- onDataSubscribersRemoved(mDataSubscribers);
- mDataSubscribers.clear();
- }
+ public abstract void removeAllDataSubscribers();
- /**
- * Called when a new subscriber is added to the publisher.
- *
- * @throws IllegalArgumentException if the invalid subscriber was provided.
- */
- protected abstract void onDataSubscriberAdded(DataSubscriber subscriber);
-
- /** Called when subscribers are removed from the publisher. */
- protected abstract void onDataSubscribersRemoved(Collection<DataSubscriber> subscribers);
-
- protected HashSet<DataSubscriber> getDataSubscribers() {
- return mDataSubscribers;
- }
+ /** Returns true if the publisher already has this data subscriber. */
+ public abstract boolean hasDataSubscriber(DataSubscriber subscriber);
}
diff --git a/service/src/com/android/car/telemetry/publisher/VehiclePropertyPublisher.java b/service/src/com/android/car/telemetry/publisher/VehiclePropertyPublisher.java
index ed0b19b..34edceb 100644
--- a/service/src/com/android/car/telemetry/publisher/VehiclePropertyPublisher.java
+++ b/service/src/com/android/car/telemetry/publisher/VehiclePropertyPublisher.java
@@ -22,22 +22,27 @@
import android.car.hardware.property.ICarPropertyEventListener;
import android.os.Bundle;
import android.os.RemoteException;
+import android.util.ArraySet;
import android.util.Slog;
import android.util.SparseArray;
import com.android.car.CarLog;
import com.android.car.CarPropertyService;
import com.android.car.telemetry.TelemetryProto;
+import com.android.car.telemetry.TelemetryProto.Publisher.PublisherCase;
import com.android.car.telemetry.databroker.DataSubscriber;
+import com.android.internal.annotations.GuardedBy;
import com.android.internal.util.Preconditions;
-import java.util.Collection;
import java.util.List;
/**
* Publisher for Vehicle Property changes, aka {@code CarPropertyService}.
*
- * <p>TODO(b/187525360): Add car property listener logic
+ * <p> When a subscriber is added, it registers a car property change listener for the
+ * property id of the subscriber and starts pushing the change events to the subscriber.
+ *
+ * <p>Thread-safe.
*/
public class VehiclePropertyPublisher extends AbstractPublisher {
private static final boolean DEBUG = false; // STOPSHIP if true
@@ -45,9 +50,20 @@
/** Bundle key for {@link CarPropertyEvent}. */
public static final String CAR_PROPERTY_EVENT_KEY = "car_property_event";
+ // Used to synchronize add/remove DataSubscriber and CarPropertyEvent listener calls.
+ private final Object mLock = new Object();
+
private final CarPropertyService mCarPropertyService;
+
+ // The class only reads, no need to synchronize this object.
private final SparseArray<CarPropertyConfig> mCarPropertyList;
+ // SparseArray and ArraySet are memory optimized, but they can be bit slower for more
+ // than 100 items. We're expecting much less number of subscribers, so these DS are ok.
+ @GuardedBy("mLock")
+ private final SparseArray<ArraySet<DataSubscriber>> mCarPropertyToSubscribers =
+ new SparseArray<>();
+
private final ICarPropertyEventListener mCarPropertyEventListener =
new ICarPropertyEventListener.Stub() {
@Override
@@ -65,14 +81,15 @@
public VehiclePropertyPublisher(CarPropertyService carPropertyService) {
mCarPropertyService = carPropertyService;
// Load car property list once, as the list doesn't change runtime.
- mCarPropertyList = new SparseArray<>();
- for (CarPropertyConfig property : mCarPropertyService.getPropertyList()) {
+ List<CarPropertyConfig> propertyList = mCarPropertyService.getPropertyList();
+ mCarPropertyList = new SparseArray<>(propertyList.size());
+ for (CarPropertyConfig property : propertyList) {
mCarPropertyList.append(property.getPropertyId(), property);
}
}
@Override
- protected void onDataSubscriberAdded(DataSubscriber subscriber) {
+ public void addDataSubscriber(DataSubscriber subscriber) {
TelemetryProto.Publisher publisherParam = subscriber.getPublisherParam();
Preconditions.checkArgument(
publisherParam.getPublisherCase()
@@ -88,31 +105,86 @@
|| config.getAccess()
== CarPropertyConfig.VEHICLE_PROPERTY_ACCESS_READ_WRITE,
"No access. Cannot read " + VehiclePropertyIds.toString(propertyId) + ".");
- mCarPropertyService.registerListener(
- propertyId,
- publisherParam.getVehicleProperty().getReadRate(),
- mCarPropertyEventListener);
+
+ synchronized (mLock) {
+ ArraySet<DataSubscriber> subscribers = mCarPropertyToSubscribers.get(propertyId);
+ if (subscribers == null) {
+ subscribers = new ArraySet<>();
+ mCarPropertyToSubscribers.put(propertyId, subscribers);
+ // Register the listener only once per propertyId.
+ mCarPropertyService.registerListener(
+ propertyId,
+ publisherParam.getVehicleProperty().getReadRate(),
+ mCarPropertyEventListener);
+ }
+ subscribers.add(subscriber);
+ }
}
@Override
- protected void onDataSubscribersRemoved(Collection<DataSubscriber> subscribers) {
- // TODO(b/190230611): Remove car property listener
+ public void removeDataSubscriber(DataSubscriber subscriber) {
+ TelemetryProto.Publisher publisherParam = subscriber.getPublisherParam();
+ Preconditions.checkArgument(
+ publisherParam.getPublisherCase() == PublisherCase.VEHICLE_PROPERTY,
+ "Subscribers only with VehicleProperty publisher are supported by this class.");
+ int propertyId = publisherParam.getVehicleProperty().getVehiclePropertyId();
+
+ synchronized (mLock) {
+ ArraySet<DataSubscriber> subscribers = mCarPropertyToSubscribers.get(propertyId);
+ if (subscribers == null || !subscribers.remove(subscriber)) {
+ throw new IllegalArgumentException("DataSubscriber was not found");
+ }
+ if (subscribers.isEmpty()) {
+ mCarPropertyToSubscribers.remove(propertyId);
+ // Doesn't throw exception as listener is not null. mCarPropertyService and
+ // local mCarPropertyToSubscribers will not get out of sync.
+ mCarPropertyService.unregisterListener(propertyId, mCarPropertyEventListener);
+ }
+ }
+ }
+
+ @Override
+ public void removeAllDataSubscribers() {
+ synchronized (mLock) {
+ for (int i = 0; i < mCarPropertyToSubscribers.size(); i++) {
+ int propertyId = mCarPropertyToSubscribers.keyAt(i);
+ // Doesn't throw exception as listener is not null. mCarPropertyService and
+ // local mCarPropertyToSubscribers will not get out of sync.
+ mCarPropertyService.unregisterListener(propertyId, mCarPropertyEventListener);
+ }
+ mCarPropertyToSubscribers.clear();
+ }
+ }
+
+ @Override
+ public boolean hasDataSubscriber(DataSubscriber subscriber) {
+ TelemetryProto.Publisher publisherParam = subscriber.getPublisherParam();
+ if (publisherParam.getPublisherCase() != PublisherCase.VEHICLE_PROPERTY) {
+ return false;
+ }
+ int propertyId = publisherParam.getVehicleProperty().getVehiclePropertyId();
+
+ synchronized (mLock) {
+ ArraySet<DataSubscriber> subscribers = mCarPropertyToSubscribers.get(propertyId);
+ return subscribers != null && subscribers.contains(subscriber);
+ }
}
/**
- * Called when publisher receives new events. It's called on CarPropertyService's worker
- * thread.
+ * Called when publisher receives new event. It's executed on a CarPropertyService's
+ * worker thread.
*/
private void onVehicleEvent(CarPropertyEvent event) {
Bundle bundle = new Bundle();
bundle.putParcelable(CAR_PROPERTY_EVENT_KEY, event);
- for (DataSubscriber subscriber : getDataSubscribers()) {
- TelemetryProto.Publisher publisherParam = subscriber.getPublisherParam();
- if (event.getCarPropertyValue().getPropertyId()
- != publisherParam.getVehicleProperty().getVehiclePropertyId()) {
- continue;
+
+ synchronized (mLock) {
+ ArraySet<DataSubscriber> subscribers =
+ mCarPropertyToSubscribers.get(event.getCarPropertyValue().getPropertyId());
+ // DataSubscriber#push() doesn't block.
+ for (DataSubscriber subscriber : subscribers) {
+ subscriber.push(bundle);
}
- subscriber.push(bundle);
}
}
}
diff --git a/tests/carservice_unit_test/src/com/android/car/telemetry/publisher/VehiclePropertyPublisherTest.java b/tests/carservice_unit_test/src/com/android/car/telemetry/publisher/VehiclePropertyPublisherTest.java
index bcd408a..dad880c 100644
--- a/tests/carservice_unit_test/src/com/android/car/telemetry/publisher/VehiclePropertyPublisherTest.java
+++ b/tests/carservice_unit_test/src/com/android/car/telemetry/publisher/VehiclePropertyPublisherTest.java
@@ -27,6 +27,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -108,7 +109,21 @@
mVehiclePropertyPublisher.addDataSubscriber(mMockDataSubscriber);
verify(mMockCarPropertyService).registerListener(eq(PROP_ID_1), eq(PROP_READ_RATE), any());
- assertThat(mVehiclePropertyPublisher.getDataSubscribers()).hasSize(1);
+ assertThat(mVehiclePropertyPublisher.hasDataSubscriber(mMockDataSubscriber)).isTrue();
+ }
+
+ @Test
+ public void testAddDataSubscriber_withSamePropertyId_registersSingleListener() {
+ DataSubscriber subscriber2 = mock(DataSubscriber.class);
+ when(subscriber2.getPublisherParam()).thenReturn(PUBLISHER_PARAMS_1);
+
+ mVehiclePropertyPublisher.addDataSubscriber(mMockDataSubscriber);
+ mVehiclePropertyPublisher.addDataSubscriber(subscriber2);
+
+ verify(mMockCarPropertyService, times(1))
+ .registerListener(eq(PROP_ID_1), eq(PROP_READ_RATE), any());
+ assertThat(mVehiclePropertyPublisher.hasDataSubscriber(mMockDataSubscriber)).isTrue();
+ assertThat(mVehiclePropertyPublisher.hasDataSubscriber(subscriber2)).isTrue();
}
@Test
@@ -125,7 +140,7 @@
() -> mVehiclePropertyPublisher.addDataSubscriber(invalidDataSubscriber));
assertThat(error).hasMessageThat().contains("No access.");
- assertThat(mVehiclePropertyPublisher.getDataSubscribers()).isEmpty();
+ assertThat(mVehiclePropertyPublisher.hasDataSubscriber(mMockDataSubscriber)).isFalse();
}
@Test
@@ -137,7 +152,7 @@
() -> mVehiclePropertyPublisher.addDataSubscriber(invalidDataSubscriber));
assertThat(error).hasMessageThat().contains("not found");
- assertThat(mVehiclePropertyPublisher.getDataSubscribers()).isEmpty();
+ assertThat(mVehiclePropertyPublisher.hasDataSubscriber(mMockDataSubscriber)).isFalse();
}
@Test
@@ -145,7 +160,9 @@
mVehiclePropertyPublisher.addDataSubscriber(mMockDataSubscriber);
mVehiclePropertyPublisher.removeDataSubscriber(mMockDataSubscriber);
- // TODO(b/189143814): add proper verification
+
+ verify(mMockCarPropertyService, times(1)).unregisterListener(eq(PROP_ID_1), any());
+ assertThat(mVehiclePropertyPublisher.hasDataSubscriber(mMockDataSubscriber)).isFalse();
}
@Test
@@ -153,13 +170,21 @@
Throwable error = assertThrows(IllegalArgumentException.class,
() -> mVehiclePropertyPublisher.removeDataSubscriber(mMockDataSubscriber));
- assertThat(error).hasMessageThat().contains("subscriber not found");
+ assertThat(error).hasMessageThat().contains("DataSubscriber was not found");
}
@Test
public void testRemoveAllDataSubscribers_succeeds() {
+ DataSubscriber subscriber2 = mock(DataSubscriber.class);
+ when(subscriber2.getPublisherParam()).thenReturn(PUBLISHER_PARAMS_1);
+ mVehiclePropertyPublisher.addDataSubscriber(mMockDataSubscriber);
+ mVehiclePropertyPublisher.addDataSubscriber(subscriber2);
+
mVehiclePropertyPublisher.removeAllDataSubscribers();
- // TODO(b/189143814): add tests
+
+ assertThat(mVehiclePropertyPublisher.hasDataSubscriber(mMockDataSubscriber)).isFalse();
+ assertThat(mVehiclePropertyPublisher.hasDataSubscriber(subscriber2)).isFalse();
+ verify(mMockCarPropertyService, times(1)).unregisterListener(eq(PROP_ID_1), any());
}
@Test