Add removing subscribers and re-organize publishers

- Remove AbstractPublisher's callbacks and
  directly implement addDataSubscriber() and other
  methods in child classes. It gives more flexibility.
- Make VehiclePropertyPublisher thread-safe.
- Add more tests.

Bug: 189143814
Test: atest CarServiceUnitTest:VehiclePropertyPublisherTest
Change-Id: I061653477716badb83efbdd0f6b4fd7110997b5b
diff --git a/service/src/com/android/car/telemetry/databroker/DataSubscriber.java b/service/src/com/android/car/telemetry/databroker/DataSubscriber.java
index 0c3e9ff..4650309 100644
--- a/service/src/com/android/car/telemetry/databroker/DataSubscriber.java
+++ b/service/src/com/android/car/telemetry/databroker/DataSubscriber.java
@@ -27,6 +27,7 @@
  * {@link com.android.car.telemetry.TelemetryProto.MetricsConfig} does not change during runtime.
  * TODO(b/187743369): thread-safety can change if priority can be updated in runtime. Update
  *                    javadoc once priority is concretely defined.
+ *                    Must be thread-safe, as #push() method may be called by multiple threads.
  */
 public class DataSubscriber {
 
@@ -57,12 +58,12 @@
     /**
      * Creates a {@link ScriptExecutionTask} and pushes it to the priority queue where the task
      * will be pending execution.
-     * This method is thread-safe because {@link DataBroker#addTaskToQueue(ScriptExecutionTask)} is
-     * thread-safe.
+     *
+     * <p>This method is thread-safe and doesn't block.
      */
     public void push(Bundle data) {
-        ScriptExecutionTask task = new ScriptExecutionTask(this, data,
-                SystemClock.elapsedRealtime());
+        ScriptExecutionTask task = new ScriptExecutionTask(
+                this, data, SystemClock.elapsedRealtime());
         mDataBroker.addTaskToQueue(task); // thread-safe
     }
 
diff --git a/service/src/com/android/car/telemetry/publisher/AbstractPublisher.java b/service/src/com/android/car/telemetry/publisher/AbstractPublisher.java
index f58a6fa..1a2bb4a 100644
--- a/service/src/com/android/car/telemetry/publisher/AbstractPublisher.java
+++ b/service/src/com/android/car/telemetry/publisher/AbstractPublisher.java
@@ -18,10 +18,6 @@
 
 import com.android.car.telemetry.databroker.DataSubscriber;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-
 /**
  * Abstract class for publishers. It is 1-1 with data source and manages sending data to
  * subscribers. Publisher stops itself when there are no subscribers.
@@ -30,54 +26,29 @@
  * configuration. Single publisher instance can send data as several
  * {@link com.android.car.telemetry.TelemetryProto.Publisher} to subscribers.
  *
- * <p>Not thread-safe.
+ * <p>Child classes must be thread-safe.
  */
 public abstract class AbstractPublisher {
-    private final HashSet<DataSubscriber> mDataSubscribers = new HashSet<>();
-
     /**
      * Adds a subscriber that listens for data produced by this publisher.
      *
      * @param subscriber a subscriber to receive data
      * @throws IllegalArgumentException if an invalid subscriber was provided.
      */
-    public void addDataSubscriber(DataSubscriber subscriber) {
-        // This method can throw exceptions if data is invalid - do now swap these 2 lines.
-        onDataSubscriberAdded(subscriber);
-        mDataSubscribers.add(subscriber);
-    }
+    public abstract void addDataSubscriber(DataSubscriber subscriber);
 
     /**
      * Removes the subscriber from the publisher. Publisher stops if necessary.
      *
      * @throws IllegalArgumentException if the subscriber was not found.
      */
-    public void removeDataSubscriber(DataSubscriber subscriber) {
-        if (!mDataSubscribers.remove(subscriber)) {
-            throw new IllegalArgumentException("Failed to remove, subscriber not found");
-        }
-        onDataSubscribersRemoved(Collections.singletonList(subscriber));
-    }
+    public abstract void removeDataSubscriber(DataSubscriber subscriber);
 
     /**
      * Removes all the subscribers from the publisher. The publisher may stop.
      */
-    public void removeAllDataSubscribers() {
-        onDataSubscribersRemoved(mDataSubscribers);
-        mDataSubscribers.clear();
-    }
+    public abstract void removeAllDataSubscribers();
 
-    /**
-     * Called when a new subscriber is added to the publisher.
-     *
-     * @throws IllegalArgumentException if the invalid subscriber was provided.
-     */
-    protected abstract void onDataSubscriberAdded(DataSubscriber subscriber);
-
-    /** Called when subscribers are removed from the publisher. */
-    protected abstract void onDataSubscribersRemoved(Collection<DataSubscriber> subscribers);
-
-    protected HashSet<DataSubscriber> getDataSubscribers() {
-        return mDataSubscribers;
-    }
+    /** Returns true if the publisher already has this data subscriber. */
+    public abstract boolean hasDataSubscriber(DataSubscriber subscriber);
 }
diff --git a/service/src/com/android/car/telemetry/publisher/VehiclePropertyPublisher.java b/service/src/com/android/car/telemetry/publisher/VehiclePropertyPublisher.java
index ed0b19b..34edceb 100644
--- a/service/src/com/android/car/telemetry/publisher/VehiclePropertyPublisher.java
+++ b/service/src/com/android/car/telemetry/publisher/VehiclePropertyPublisher.java
@@ -22,22 +22,27 @@
 import android.car.hardware.property.ICarPropertyEventListener;
 import android.os.Bundle;
 import android.os.RemoteException;
+import android.util.ArraySet;
 import android.util.Slog;
 import android.util.SparseArray;
 
 import com.android.car.CarLog;
 import com.android.car.CarPropertyService;
 import com.android.car.telemetry.TelemetryProto;
+import com.android.car.telemetry.TelemetryProto.Publisher.PublisherCase;
 import com.android.car.telemetry.databroker.DataSubscriber;
+import com.android.internal.annotations.GuardedBy;
 import com.android.internal.util.Preconditions;
 
-import java.util.Collection;
 import java.util.List;
 
 /**
  * Publisher for Vehicle Property changes, aka {@code CarPropertyService}.
  *
- * <p>TODO(b/187525360): Add car property listener logic
+ * <p> When a subscriber is added, it registers a car property change listener for the
+ * property id of the subscriber and starts pushing the change events to the subscriber.
+ *
+ * <p>Thread-safe.
  */
 public class VehiclePropertyPublisher extends AbstractPublisher {
     private static final boolean DEBUG = false;  // STOPSHIP if true
@@ -45,9 +50,20 @@
     /** Bundle key for {@link CarPropertyEvent}. */
     public static final String CAR_PROPERTY_EVENT_KEY = "car_property_event";
 
+    // Used to synchronize add/remove DataSubscriber and CarPropertyEvent listener calls.
+    private final Object mLock = new Object();
+
     private final CarPropertyService mCarPropertyService;
+
+    // The class only reads, no need to synchronize this object.
     private final SparseArray<CarPropertyConfig> mCarPropertyList;
 
+    // SparseArray and ArraySet are memory optimized, but they can be bit slower for more
+    // than 100 items. We're expecting much less number of subscribers, so these DS are ok.
+    @GuardedBy("mLock")
+    private final SparseArray<ArraySet<DataSubscriber>> mCarPropertyToSubscribers =
+            new SparseArray<>();
+
     private final ICarPropertyEventListener mCarPropertyEventListener =
             new ICarPropertyEventListener.Stub() {
                 @Override
@@ -65,14 +81,15 @@
     public VehiclePropertyPublisher(CarPropertyService carPropertyService) {
         mCarPropertyService = carPropertyService;
         // Load car property list once, as the list doesn't change runtime.
-        mCarPropertyList = new SparseArray<>();
-        for (CarPropertyConfig property : mCarPropertyService.getPropertyList()) {
+        List<CarPropertyConfig> propertyList = mCarPropertyService.getPropertyList();
+        mCarPropertyList = new SparseArray<>(propertyList.size());
+        for (CarPropertyConfig property : propertyList) {
             mCarPropertyList.append(property.getPropertyId(), property);
         }
     }
 
     @Override
-    protected void onDataSubscriberAdded(DataSubscriber subscriber) {
+    public void addDataSubscriber(DataSubscriber subscriber) {
         TelemetryProto.Publisher publisherParam = subscriber.getPublisherParam();
         Preconditions.checkArgument(
                 publisherParam.getPublisherCase()
@@ -88,31 +105,86 @@
                         || config.getAccess()
                         == CarPropertyConfig.VEHICLE_PROPERTY_ACCESS_READ_WRITE,
                 "No access. Cannot read " + VehiclePropertyIds.toString(propertyId) + ".");
-        mCarPropertyService.registerListener(
-                propertyId,
-                publisherParam.getVehicleProperty().getReadRate(),
-                mCarPropertyEventListener);
+
+        synchronized (mLock) {
+            ArraySet<DataSubscriber> subscribers = mCarPropertyToSubscribers.get(propertyId);
+            if (subscribers == null) {
+                subscribers = new ArraySet<>();
+                mCarPropertyToSubscribers.put(propertyId, subscribers);
+                // Register the listener only once per propertyId.
+                mCarPropertyService.registerListener(
+                        propertyId,
+                        publisherParam.getVehicleProperty().getReadRate(),
+                        mCarPropertyEventListener);
+            }
+            subscribers.add(subscriber);
+        }
     }
 
     @Override
-    protected void onDataSubscribersRemoved(Collection<DataSubscriber> subscribers) {
-        // TODO(b/190230611): Remove car property listener
+    public void removeDataSubscriber(DataSubscriber subscriber) {
+        TelemetryProto.Publisher publisherParam = subscriber.getPublisherParam();
+        Preconditions.checkArgument(
+                publisherParam.getPublisherCase() == PublisherCase.VEHICLE_PROPERTY,
+                "Subscribers only with VehicleProperty publisher are supported by this class.");
+        int propertyId = publisherParam.getVehicleProperty().getVehiclePropertyId();
+
+        synchronized (mLock) {
+            ArraySet<DataSubscriber> subscribers = mCarPropertyToSubscribers.get(propertyId);
+            if (subscribers == null || !subscribers.remove(subscriber)) {
+                throw new IllegalArgumentException("DataSubscriber was not found");
+            }
+            if (subscribers.isEmpty()) {
+                mCarPropertyToSubscribers.remove(propertyId);
+                // Doesn't throw exception as listener is not null. mCarPropertyService and
+                // local mCarPropertyToSubscribers will not get out of sync.
+                mCarPropertyService.unregisterListener(propertyId, mCarPropertyEventListener);
+            }
+        }
+    }
+
+    @Override
+    public void removeAllDataSubscribers() {
+        synchronized (mLock) {
+            for (int i = 0; i < mCarPropertyToSubscribers.size(); i++) {
+                int propertyId = mCarPropertyToSubscribers.keyAt(i);
+                // Doesn't throw exception as listener is not null. mCarPropertyService and
+                // local mCarPropertyToSubscribers will not get out of sync.
+                mCarPropertyService.unregisterListener(propertyId, mCarPropertyEventListener);
+            }
+            mCarPropertyToSubscribers.clear();
+        }
+    }
+
+    @Override
+    public boolean hasDataSubscriber(DataSubscriber subscriber) {
+        TelemetryProto.Publisher publisherParam = subscriber.getPublisherParam();
+        if (publisherParam.getPublisherCase() != PublisherCase.VEHICLE_PROPERTY) {
+            return false;
+        }
+        int propertyId = publisherParam.getVehicleProperty().getVehiclePropertyId();
+
+        synchronized (mLock) {
+            ArraySet<DataSubscriber> subscribers = mCarPropertyToSubscribers.get(propertyId);
+            return subscribers != null && subscribers.contains(subscriber);
+        }
     }
 
     /**
-     * Called when publisher receives new events. It's called on CarPropertyService's worker
-     * thread.
+     * Called when publisher receives new event. It's executed on a CarPropertyService's
+     * worker thread.
      */
     private void onVehicleEvent(CarPropertyEvent event) {
         Bundle bundle = new Bundle();
         bundle.putParcelable(CAR_PROPERTY_EVENT_KEY, event);
-        for (DataSubscriber subscriber : getDataSubscribers()) {
-            TelemetryProto.Publisher publisherParam = subscriber.getPublisherParam();
-            if (event.getCarPropertyValue().getPropertyId()
-                    != publisherParam.getVehicleProperty().getVehiclePropertyId()) {
-                continue;
+
+        synchronized (mLock) {
+            ArraySet<DataSubscriber> subscribers =
+                    mCarPropertyToSubscribers.get(event.getCarPropertyValue().getPropertyId());
+            // DataSubscriber#push() doesn't block.
+            for (DataSubscriber subscriber : subscribers) {
+                subscriber.push(bundle);
             }
-            subscriber.push(bundle);
         }
     }
 }
diff --git a/tests/carservice_unit_test/src/com/android/car/telemetry/publisher/VehiclePropertyPublisherTest.java b/tests/carservice_unit_test/src/com/android/car/telemetry/publisher/VehiclePropertyPublisherTest.java
index bcd408a..dad880c 100644
--- a/tests/carservice_unit_test/src/com/android/car/telemetry/publisher/VehiclePropertyPublisherTest.java
+++ b/tests/carservice_unit_test/src/com/android/car/telemetry/publisher/VehiclePropertyPublisherTest.java
@@ -27,6 +27,7 @@
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -108,7 +109,21 @@
         mVehiclePropertyPublisher.addDataSubscriber(mMockDataSubscriber);
 
         verify(mMockCarPropertyService).registerListener(eq(PROP_ID_1), eq(PROP_READ_RATE), any());
-        assertThat(mVehiclePropertyPublisher.getDataSubscribers()).hasSize(1);
+        assertThat(mVehiclePropertyPublisher.hasDataSubscriber(mMockDataSubscriber)).isTrue();
+    }
+
+    @Test
+    public void testAddDataSubscriber_withSamePropertyId_registersSingleListener() {
+        DataSubscriber subscriber2 = mock(DataSubscriber.class);
+        when(subscriber2.getPublisherParam()).thenReturn(PUBLISHER_PARAMS_1);
+
+        mVehiclePropertyPublisher.addDataSubscriber(mMockDataSubscriber);
+        mVehiclePropertyPublisher.addDataSubscriber(subscriber2);
+
+        verify(mMockCarPropertyService, times(1))
+                .registerListener(eq(PROP_ID_1), eq(PROP_READ_RATE), any());
+        assertThat(mVehiclePropertyPublisher.hasDataSubscriber(mMockDataSubscriber)).isTrue();
+        assertThat(mVehiclePropertyPublisher.hasDataSubscriber(subscriber2)).isTrue();
     }
 
     @Test
@@ -125,7 +140,7 @@
                 () -> mVehiclePropertyPublisher.addDataSubscriber(invalidDataSubscriber));
 
         assertThat(error).hasMessageThat().contains("No access.");
-        assertThat(mVehiclePropertyPublisher.getDataSubscribers()).isEmpty();
+        assertThat(mVehiclePropertyPublisher.hasDataSubscriber(mMockDataSubscriber)).isFalse();
     }
 
     @Test
@@ -137,7 +152,7 @@
                 () -> mVehiclePropertyPublisher.addDataSubscriber(invalidDataSubscriber));
 
         assertThat(error).hasMessageThat().contains("not found");
-        assertThat(mVehiclePropertyPublisher.getDataSubscribers()).isEmpty();
+        assertThat(mVehiclePropertyPublisher.hasDataSubscriber(mMockDataSubscriber)).isFalse();
     }
 
     @Test
@@ -145,7 +160,9 @@
         mVehiclePropertyPublisher.addDataSubscriber(mMockDataSubscriber);
 
         mVehiclePropertyPublisher.removeDataSubscriber(mMockDataSubscriber);
-        // TODO(b/189143814): add proper verification
+
+        verify(mMockCarPropertyService, times(1)).unregisterListener(eq(PROP_ID_1), any());
+        assertThat(mVehiclePropertyPublisher.hasDataSubscriber(mMockDataSubscriber)).isFalse();
     }
 
     @Test
@@ -153,13 +170,21 @@
         Throwable error = assertThrows(IllegalArgumentException.class,
                 () -> mVehiclePropertyPublisher.removeDataSubscriber(mMockDataSubscriber));
 
-        assertThat(error).hasMessageThat().contains("subscriber not found");
+        assertThat(error).hasMessageThat().contains("DataSubscriber was not found");
     }
 
     @Test
     public void testRemoveAllDataSubscribers_succeeds() {
+        DataSubscriber subscriber2 = mock(DataSubscriber.class);
+        when(subscriber2.getPublisherParam()).thenReturn(PUBLISHER_PARAMS_1);
+        mVehiclePropertyPublisher.addDataSubscriber(mMockDataSubscriber);
+        mVehiclePropertyPublisher.addDataSubscriber(subscriber2);
+
         mVehiclePropertyPublisher.removeAllDataSubscribers();
-        // TODO(b/189143814): add tests
+
+        assertThat(mVehiclePropertyPublisher.hasDataSubscriber(mMockDataSubscriber)).isFalse();
+        assertThat(mVehiclePropertyPublisher.hasDataSubscriber(subscriber2)).isFalse();
+        verify(mMockCarPropertyService, times(1)).unregisterListener(eq(PROP_ID_1), any());
     }
 
     @Test