Add VehiclePropertyPublisher and generic publisher

- VehicleProperty is simple publisher, we will add it first.
- Add some unit tests to cover basic usage.
- Remove Logcat for now.

Test: atest CarServiceUnitTest:VehiclePropertyPublisherTest
Bug: 189143814
Change-Id: I03427554b33fb05f8c53d921ccf5bd18cc80b820
diff --git a/service/src/com/android/car/CarLog.java b/service/src/com/android/car/CarLog.java
index d629b50..4585eda 100644
--- a/service/src/com/android/car/CarLog.java
+++ b/service/src/com/android/car/CarLog.java
@@ -42,6 +42,7 @@
     public static final String TAG_SENSOR = "CAR.SENSOR";
     public static final String TAG_SERVICE = "CAR.SERVICE";
     public static final String TAG_STORAGE = "CAR.STORAGE";
+    public static final String TAG_TELEMETRY = "CAR.TELEMETRY";
     public static final String TAG_WATCHDOG = "CAR.WATCHDOG";
 
     /**
diff --git a/service/src/com/android/car/telemetry/databroker/DataBrokerImpl.java b/service/src/com/android/car/telemetry/databroker/DataBrokerImpl.java
index 3af9d2c..0a46536 100644
--- a/service/src/com/android/car/telemetry/databroker/DataBrokerImpl.java
+++ b/service/src/com/android/car/telemetry/databroker/DataBrokerImpl.java
@@ -17,11 +17,13 @@
 package com.android.car.telemetry.databroker;
 
 import android.util.ArrayMap;
+import android.util.Slog;
 
+import com.android.car.CarLog;
 import com.android.car.telemetry.TelemetryProto;
 import com.android.car.telemetry.TelemetryProto.MetricsConfig;
-import com.android.car.telemetry.publisher.LogcatPublisher;
-import com.android.car.telemetry.publisher.Publisher;
+import com.android.car.telemetry.publisher.AbstractPublisher;
+import com.android.car.telemetry.publisher.PublisherFactory;
 import com.android.internal.annotations.VisibleForTesting;
 
 import java.util.ArrayList;
@@ -34,17 +36,16 @@
  */
 public class DataBrokerImpl implements DataBroker {
 
-    // Publisher is created per data source type. Publishers are kept alive once created. This map
-    // is used to check if a publisher already exists for a given type to prevent duplicate
-    // instantiation.
-    private final Map<TelemetryProto.Publisher.PublisherCase, Publisher> mPublisherMap =
-            new ArrayMap<>();
-
     // Maps MetricsConfig's name to its subscriptions. This map is useful when removing a
     // MetricsConfig.
     private final Map<String, List<DataSubscriber>> mSubscriptionMap = new ArrayMap<>();
 
     private DataBrokerController.ScriptFinishedCallback mScriptFinishedCallback;
+    private final PublisherFactory mPublisherFactory;
+
+    public DataBrokerImpl(PublisherFactory publisherFactory) {
+        mPublisherFactory = publisherFactory;
+    }
 
     @Override
     public boolean addMetricsConfiguration(MetricsConfig metricsConfig) {
@@ -56,14 +57,21 @@
         List<DataSubscriber> dataSubscribers = new ArrayList<>();
         for (TelemetryProto.Subscriber subscriber : metricsConfig.getSubscribersList()) {
             // protobuf publisher to a concrete Publisher
-            Publisher publisher = getOrCreatePublisherFromType(
+            AbstractPublisher publisher = mPublisherFactory.getPublisher(
                     subscriber.getPublisher().getPublisherCase());
 
             // create DataSubscriber from TelemetryProto.Subscriber
             DataSubscriber dataSubscriber = new DataSubscriber(metricsConfig, subscriber);
             dataSubscribers.add(dataSubscriber);
 
-            publisher.addSubscriber(dataSubscriber); // add subscriber to receive data
+            try {
+                // The publisher will start sending data to the subscriber.
+                // TODO(b/191378559): handle bad configs
+                publisher.addDataSubscriber(dataSubscriber);
+            } catch (IllegalArgumentException e) {
+                Slog.w(CarLog.TAG_TELEMETRY, "Invalid config", e);
+                return false;
+            }
         }
         mSubscriptionMap.put(metricsConfig.getName(), dataSubscribers);
         return true;
@@ -78,9 +86,14 @@
         List<DataSubscriber> dataSubscribers = mSubscriptionMap.remove(metricsConfig.getName());
         // for each subscriber, remove it from publishers
         for (DataSubscriber subscriber : dataSubscribers) {
-            Publisher publisher = getOrCreatePublisherFromType(
+            AbstractPublisher publisher = mPublisherFactory.getPublisher(
                     subscriber.getPublisherParam().getPublisherCase());
-            publisher.removeSubscriber(subscriber);
+            try {
+                publisher.removeDataSubscriber(subscriber);
+            } catch (IllegalArgumentException e) {
+                // It shouldn't happen, but if happens, let's just log it.
+                Slog.w(CarLog.TAG_TELEMETRY, "Failed to remove subscriber from publisher", e);
+            }
             // TODO(b/187743369): remove related tasks from the queue
         }
         return true;
@@ -91,29 +104,6 @@
         mScriptFinishedCallback = callback;
     }
 
-    /**
-     * Gets and returns a {@link com.android.car.telemetry.publisher.Publisher} if it exists in
-     * the map, or creates one from the {@link com.android.car.telemetry.TelemetryProto.Publisher}'s
-     * type.
-     */
-    private Publisher getOrCreatePublisherFromType(
-            TelemetryProto.Publisher.PublisherCase type) {
-        Publisher publisher = mPublisherMap.get(type);
-        // check if publisher exists for this source
-        if (publisher != null) {
-            return publisher;
-        }
-        // TODO(b/187743369): use switch statement to create the correct publisher
-        publisher = new LogcatPublisher();
-        mPublisherMap.put(type, publisher);
-        return publisher;
-    }
-
-    @VisibleForTesting
-    Map<TelemetryProto.Publisher.PublisherCase, Publisher> getPublisherMap() {
-        return mPublisherMap;
-    }
-
     @VisibleForTesting
     Map<String, List<DataSubscriber>> getSubscriptionMap() {
         return mSubscriptionMap;
diff --git a/service/src/com/android/car/telemetry/databroker/DataSubscriber.java b/service/src/com/android/car/telemetry/databroker/DataSubscriber.java
index a32cc66..0c713c5 100644
--- a/service/src/com/android/car/telemetry/databroker/DataSubscriber.java
+++ b/service/src/com/android/car/telemetry/databroker/DataSubscriber.java
@@ -16,6 +16,8 @@
 
 package com.android.car.telemetry.databroker;
 
+import android.os.Bundle;
+
 import com.android.car.telemetry.TelemetryProto;
 
 /**
@@ -36,4 +38,9 @@
     public TelemetryProto.Publisher getPublisherParam() {
         return mSubscriber.getPublisher();
     }
+
+    /** Pushes data to the subscriber. */
+    public void push(Bundle data) {
+        // TODO(b/187743369): implement
+    }
 }
diff --git a/service/src/com/android/car/telemetry/publisher/AbstractPublisher.java b/service/src/com/android/car/telemetry/publisher/AbstractPublisher.java
new file mode 100644
index 0000000..f58a6fa
--- /dev/null
+++ b/service/src/com/android/car/telemetry/publisher/AbstractPublisher.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.car.telemetry.publisher;
+
+import com.android.car.telemetry.databroker.DataSubscriber;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+
+/**
+ * Abstract class for publishers. It is 1-1 with data source and manages sending data to
+ * subscribers. Publisher stops itself when there are no subscribers.
+ *
+ * <p>Note that it doesn't map 1-1 to {@link com.android.car.telemetry.TelemetryProto.Publisher}
+ * configuration. Single publisher instance can send data as several
+ * {@link com.android.car.telemetry.TelemetryProto.Publisher} to subscribers.
+ *
+ * <p>Not thread-safe.
+ */
+public abstract class AbstractPublisher {
+    private final HashSet<DataSubscriber> mDataSubscribers = new HashSet<>();
+
+    /**
+     * Adds a subscriber that listens for data produced by this publisher.
+     *
+     * @param subscriber a subscriber to receive data
+     * @throws IllegalArgumentException if an invalid subscriber was provided.
+     */
+    public void addDataSubscriber(DataSubscriber subscriber) {
+        // This method can throw exceptions if data is invalid - do now swap these 2 lines.
+        onDataSubscriberAdded(subscriber);
+        mDataSubscribers.add(subscriber);
+    }
+
+    /**
+     * Removes the subscriber from the publisher. Publisher stops if necessary.
+     *
+     * @throws IllegalArgumentException if the subscriber was not found.
+     */
+    public void removeDataSubscriber(DataSubscriber subscriber) {
+        if (!mDataSubscribers.remove(subscriber)) {
+            throw new IllegalArgumentException("Failed to remove, subscriber not found");
+        }
+        onDataSubscribersRemoved(Collections.singletonList(subscriber));
+    }
+
+    /**
+     * Removes all the subscribers from the publisher. The publisher may stop.
+     */
+    public void removeAllDataSubscribers() {
+        onDataSubscribersRemoved(mDataSubscribers);
+        mDataSubscribers.clear();
+    }
+
+    /**
+     * Called when a new subscriber is added to the publisher.
+     *
+     * @throws IllegalArgumentException if the invalid subscriber was provided.
+     */
+    protected abstract void onDataSubscriberAdded(DataSubscriber subscriber);
+
+    /** Called when subscribers are removed from the publisher. */
+    protected abstract void onDataSubscribersRemoved(Collection<DataSubscriber> subscribers);
+
+    protected HashSet<DataSubscriber> getDataSubscribers() {
+        return mDataSubscribers;
+    }
+}
diff --git a/service/src/com/android/car/telemetry/publisher/LogcatPublisher.java b/service/src/com/android/car/telemetry/publisher/LogcatPublisher.java
deleted file mode 100644
index 7bebe07..0000000
--- a/service/src/com/android/car/telemetry/publisher/LogcatPublisher.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (C) 2021 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.android.car.telemetry.publisher;
-
-import com.android.car.telemetry.databroker.DataSubscriber;
-
-/**
- * Publisher for Android logs (adb logcat).
- *
- * TODO(b/187525360): Move the logic from LogcatReader here.
- */
-public class LogcatPublisher implements Publisher {
-    @Override
-    public void addSubscriber(DataSubscriber dataSubscriber) {
-        // TODO(b/187525360): implement
-    }
-
-    @Override
-    public void removeSubscriber(DataSubscriber dataSubscriber) {
-        // TODO(b/187525360): implement
-    }
-}
diff --git a/service/src/com/android/car/telemetry/publisher/Publisher.java b/service/src/com/android/car/telemetry/publisher/Publisher.java
deleted file mode 100644
index d4d670e..0000000
--- a/service/src/com/android/car/telemetry/publisher/Publisher.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (C) 2021 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.android.car.telemetry.publisher;
-
-import com.android.car.telemetry.databroker.DataSubscriber;
-
-/**
- * Interface for publishers. It's implemented per data source, data is sent to
- * {@link DataSubscriber}. Publisher stops itself when there are no subscribers.
- *
- * <p>Note that it doesn't map 1-1 to {@link TelemetryProto.Publisher} configuration. Single
- * publisher instance can send data as several {@link TelemetryProto.Publisher} to subscribers.
- */
-public interface Publisher {
-    /** Adds a subscriber that listens for data produced by this publisher. */
-    /**
-     * Adds a subscriber that listens for data produced by this publisher.
-     * @param dataSubscriber a subscriber to receive data
-     * @throws IllegalArgumentException if the subscriber was added before.
-     */
-    void addSubscriber(DataSubscriber dataSubscriber);
-
-    /**  */
-    /**
-     * Removes a subscriber from the publisher.
-     *
-     * @param dataSubscriber to be removed
-     * @throws IllegalArgumentException if the subscriber was not found.
-     */
-    void removeSubscriber(DataSubscriber dataSubscriber);
-}
diff --git a/service/src/com/android/car/telemetry/publisher/PublisherFactory.java b/service/src/com/android/car/telemetry/publisher/PublisherFactory.java
new file mode 100644
index 0000000..8d0975a
--- /dev/null
+++ b/service/src/com/android/car/telemetry/publisher/PublisherFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.car.telemetry.publisher;
+
+import com.android.car.CarPropertyService;
+import com.android.car.telemetry.TelemetryProto;
+
+/**
+ * Factory class for Publishers. It's expected to have a single factory instance.
+ *
+ * <p>Thread-safe.
+ */
+public class PublisherFactory {
+    private final Object mLock = new Object();
+    private final CarPropertyService mCarPropertyService;
+    private VehiclePropertyPublisher mVehiclePropertyPublisher;
+
+    public PublisherFactory(CarPropertyService carPropertyService) {
+        mCarPropertyService = carPropertyService;
+    }
+
+    /** Returns publisher by given type. */
+    public AbstractPublisher getPublisher(
+            TelemetryProto.Publisher.PublisherCase type) {
+        // No need to optimize locks, as this method is infrequently called.
+        synchronized (mLock) {
+            switch (type.getNumber()) {
+                case TelemetryProto.Publisher.VEHICLE_PROPERTY_FIELD_NUMBER:
+                    if (mVehiclePropertyPublisher == null) {
+                        mVehiclePropertyPublisher = new VehiclePropertyPublisher(
+                                mCarPropertyService);
+                    }
+                    return mVehiclePropertyPublisher;
+                default:
+                    throw new IllegalArgumentException(
+                            "Publisher type " + type + " is not supported");
+            }
+        }
+    }
+}
diff --git a/service/src/com/android/car/telemetry/publisher/VehiclePropertyPublisher.java b/service/src/com/android/car/telemetry/publisher/VehiclePropertyPublisher.java
new file mode 100644
index 0000000..ed0b19b
--- /dev/null
+++ b/service/src/com/android/car/telemetry/publisher/VehiclePropertyPublisher.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.car.telemetry.publisher;
+
+import android.car.VehiclePropertyIds;
+import android.car.hardware.CarPropertyConfig;
+import android.car.hardware.property.CarPropertyEvent;
+import android.car.hardware.property.ICarPropertyEventListener;
+import android.os.Bundle;
+import android.os.RemoteException;
+import android.util.Slog;
+import android.util.SparseArray;
+
+import com.android.car.CarLog;
+import com.android.car.CarPropertyService;
+import com.android.car.telemetry.TelemetryProto;
+import com.android.car.telemetry.databroker.DataSubscriber;
+import com.android.internal.util.Preconditions;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Publisher for Vehicle Property changes, aka {@code CarPropertyService}.
+ *
+ * <p>TODO(b/187525360): Add car property listener logic
+ */
+public class VehiclePropertyPublisher extends AbstractPublisher {
+    private static final boolean DEBUG = false;  // STOPSHIP if true
+
+    /** Bundle key for {@link CarPropertyEvent}. */
+    public static final String CAR_PROPERTY_EVENT_KEY = "car_property_event";
+
+    private final CarPropertyService mCarPropertyService;
+    private final SparseArray<CarPropertyConfig> mCarPropertyList;
+
+    private final ICarPropertyEventListener mCarPropertyEventListener =
+            new ICarPropertyEventListener.Stub() {
+                @Override
+                public void onEvent(List<CarPropertyEvent> events) throws RemoteException {
+                    if (DEBUG) {
+                        Slog.d(CarLog.TAG_TELEMETRY,
+                                "Received " + events.size() + " vehicle property events");
+                    }
+                    for (CarPropertyEvent event : events) {
+                        onVehicleEvent(event);
+                    }
+                }
+            };
+
+    public VehiclePropertyPublisher(CarPropertyService carPropertyService) {
+        mCarPropertyService = carPropertyService;
+        // Load car property list once, as the list doesn't change runtime.
+        mCarPropertyList = new SparseArray<>();
+        for (CarPropertyConfig property : mCarPropertyService.getPropertyList()) {
+            mCarPropertyList.append(property.getPropertyId(), property);
+        }
+    }
+
+    @Override
+    protected void onDataSubscriberAdded(DataSubscriber subscriber) {
+        TelemetryProto.Publisher publisherParam = subscriber.getPublisherParam();
+        Preconditions.checkArgument(
+                publisherParam.getPublisherCase()
+                        == TelemetryProto.Publisher.PublisherCase.VEHICLE_PROPERTY,
+                "Subscribers only with VehicleProperty publisher are supported by this class.");
+        int propertyId = publisherParam.getVehicleProperty().getVehiclePropertyId();
+        CarPropertyConfig config = mCarPropertyList.get(propertyId);
+        Preconditions.checkArgument(
+                config != null,
+                "Vehicle property " + VehiclePropertyIds.toString(propertyId) + " not found.");
+        Preconditions.checkArgument(
+                config.getAccess() == CarPropertyConfig.VEHICLE_PROPERTY_ACCESS_READ
+                        || config.getAccess()
+                        == CarPropertyConfig.VEHICLE_PROPERTY_ACCESS_READ_WRITE,
+                "No access. Cannot read " + VehiclePropertyIds.toString(propertyId) + ".");
+        mCarPropertyService.registerListener(
+                propertyId,
+                publisherParam.getVehicleProperty().getReadRate(),
+                mCarPropertyEventListener);
+    }
+
+    @Override
+    protected void onDataSubscribersRemoved(Collection<DataSubscriber> subscribers) {
+        // TODO(b/190230611): Remove car property listener
+    }
+
+    /**
+     * Called when publisher receives new events. It's called on CarPropertyService's worker
+     * thread.
+     */
+    private void onVehicleEvent(CarPropertyEvent event) {
+        Bundle bundle = new Bundle();
+        bundle.putParcelable(CAR_PROPERTY_EVENT_KEY, event);
+        for (DataSubscriber subscriber : getDataSubscribers()) {
+            TelemetryProto.Publisher publisherParam = subscriber.getPublisherParam();
+            if (event.getCarPropertyValue().getPropertyId()
+                    != publisherParam.getVehicleProperty().getVehiclePropertyId()) {
+                continue;
+            }
+            subscriber.push(bundle);
+        }
+    }
+}
diff --git a/tests/carservice_unit_test/src/com/android/car/telemetry/databroker/DataBrokerUnitTest.java b/tests/carservice_unit_test/src/com/android/car/telemetry/databroker/DataBrokerUnitTest.java
index 2f6f0d8..45c5a22 100644
--- a/tests/carservice_unit_test/src/com/android/car/telemetry/databroker/DataBrokerUnitTest.java
+++ b/tests/carservice_unit_test/src/com/android/car/telemetry/databroker/DataBrokerUnitTest.java
@@ -18,19 +18,33 @@
 
 import static com.google.common.truth.Truth.assertThat;
 
-import com.android.car.telemetry.TelemetryProto;
+import static org.mockito.Mockito.when;
 
+import android.car.hardware.CarPropertyConfig;
+
+import com.android.car.CarPropertyService;
+import com.android.car.telemetry.TelemetryProto;
+import com.android.car.telemetry.publisher.PublisherFactory;
+
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.util.Collections;
+
 @RunWith(MockitoJUnitRunner.class)
 public class DataBrokerUnitTest {
-    private final DataBrokerImpl mDataBroker = new DataBrokerImpl();
+    private static final int PROP_ID = 100;
+    private static final int PROP_AREA = 200;
+    private static final CarPropertyConfig<Integer> PROP_CONFIG =
+            CarPropertyConfig.newBuilder(Integer.class, PROP_ID, PROP_AREA).setAccess(
+                    CarPropertyConfig.VEHICLE_PROPERTY_ACCESS_READ).build();
     private static final TelemetryProto.VehiclePropertyPublisher
             VEHICLE_PROPERTY_PUBLISHER_CONFIGURATION =
             TelemetryProto.VehiclePropertyPublisher.newBuilder().setReadRate(
-                    1).setVehiclePropertyId(1000).build();
+                    1).setVehiclePropertyId(PROP_ID).build();
     private static final TelemetryProto.Publisher PUBLISHER_CONFIGURATION =
             TelemetryProto.Publisher.newBuilder().setVehicleProperty(
                     VEHICLE_PROPERTY_PUBLISHER_CONFIGURATION).build();
@@ -47,12 +61,23 @@
             TelemetryProto.MetricsConfig.newBuilder().setName("Bar").setVersion(
                     1).addSubscribers(SUBSCRIBER_BAR).build();
 
+    @Mock
+    private CarPropertyService mMockCarPropertyService;
+
+    private DataBrokerImpl mDataBroker;
+
+    @Before
+    public void setUp() {
+        when(mMockCarPropertyService.getPropertyList())
+                .thenReturn(Collections.singletonList(PROP_CONFIG));
+        PublisherFactory factory = new PublisherFactory(mMockCarPropertyService);
+        mDataBroker = new DataBrokerImpl(factory);
+    }
+
     @Test
     public void testAddMetricsConfiguration_newMetricsConfig() {
         mDataBroker.addMetricsConfiguration(METRICS_CONFIG_FOO);
 
-        assertThat(mDataBroker.getPublisherMap().containsKey(
-                TelemetryProto.Publisher.PublisherCase.VEHICLE_PROPERTY)).isTrue();
         assertThat(mDataBroker.getSubscriptionMap()).containsKey(METRICS_CONFIG_FOO.getName());
         // there should be one data subscriber in the subscription list of METRICS_CONFIG_FOO
         assertThat(mDataBroker.getSubscriptionMap().get(METRICS_CONFIG_FOO.getName())).hasSize(1);
@@ -63,7 +88,6 @@
         mDataBroker.addMetricsConfiguration(METRICS_CONFIG_FOO);
         mDataBroker.addMetricsConfiguration(METRICS_CONFIG_BAR);
 
-        assertThat(mDataBroker.getPublisherMap()).hasSize(1);
         assertThat(mDataBroker.getSubscriptionMap()).containsKey(METRICS_CONFIG_FOO.getName());
         assertThat(mDataBroker.getSubscriptionMap()).containsKey(METRICS_CONFIG_BAR.getName());
     }
@@ -78,18 +102,6 @@
     }
 
     @Test
-    public void testRemoveMetricsConfiguration_publisherShouldExist() {
-        mDataBroker.addMetricsConfiguration(METRICS_CONFIG_FOO);
-
-        mDataBroker.removeMetricsConfiguration(METRICS_CONFIG_FOO);
-
-        assertThat(mDataBroker.getPublisherMap()).containsKey(
-                TelemetryProto.Publisher.PublisherCase.VEHICLE_PROPERTY);
-        assertThat(mDataBroker.getSubscriptionMap()).doesNotContainKey(
-                METRICS_CONFIG_FOO.getName());
-    }
-
-    @Test
     public void testRemoveMetricsConfiguration_removeNonexistentMetricsConfig() {
         boolean status = mDataBroker.removeMetricsConfiguration(METRICS_CONFIG_FOO);
 
diff --git a/tests/carservice_unit_test/src/com/android/car/telemetry/publisher/VehiclePropertyPublisherTest.java b/tests/carservice_unit_test/src/com/android/car/telemetry/publisher/VehiclePropertyPublisherTest.java
new file mode 100644
index 0000000..bcd408a
--- /dev/null
+++ b/tests/carservice_unit_test/src/com/android/car/telemetry/publisher/VehiclePropertyPublisherTest.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.car.telemetry.publisher;
+
+import static android.car.hardware.property.CarPropertyEvent.PROPERTY_EVENT_PROPERTY_CHANGE;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyFloat;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import android.car.hardware.CarPropertyConfig;
+import android.car.hardware.CarPropertyValue;
+import android.car.hardware.property.CarPropertyEvent;
+import android.car.hardware.property.ICarPropertyEventListener;
+import android.os.Bundle;
+
+import com.android.car.CarPropertyService;
+import com.android.car.telemetry.TelemetryProto;
+import com.android.car.telemetry.databroker.DataSubscriber;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.Collections;
+import java.util.List;
+
+@RunWith(MockitoJUnitRunner.class)
+public class VehiclePropertyPublisherTest {
+    private static final int PROP_ID_1 = 100;
+    private static final int PROP_ID_2 = 102;
+    private static final int AREA_ID = 20;
+    private static final float PROP_READ_RATE = 0.0f;
+    private static final CarPropertyEvent PROP_EVENT_1 =
+            new CarPropertyEvent(PROPERTY_EVENT_PROPERTY_CHANGE,
+                    new CarPropertyValue<>(PROP_ID_1, AREA_ID, /* value= */ 1));
+
+    private static final TelemetryProto.Publisher PUBLISHER_PARAMS_1 =
+            TelemetryProto.Publisher.newBuilder()
+                    .setVehicleProperty(TelemetryProto.VehiclePropertyPublisher.newBuilder()
+                            .setReadRate(PROP_READ_RATE)
+                            .setVehiclePropertyId(PROP_ID_1))
+                    .build();
+    private static final TelemetryProto.Publisher PUBLISHER_PARAMS_INVALID =
+            TelemetryProto.Publisher.newBuilder()
+                    .setVehicleProperty(TelemetryProto.VehiclePropertyPublisher.newBuilder()
+                            .setReadRate(PROP_READ_RATE)
+                            .setVehiclePropertyId(-200))
+                    .build();
+
+    // mMockCarPropertyService supported car property list.
+    private static final CarPropertyConfig<Integer> PROP_CONFIG_1 =
+            CarPropertyConfig.newBuilder(Integer.class, PROP_ID_1, AREA_ID).setAccess(
+                    CarPropertyConfig.VEHICLE_PROPERTY_ACCESS_READ).build();
+    private static final CarPropertyConfig<Integer> PROP_CONFIG_2_WRITE_ONLY =
+            CarPropertyConfig.newBuilder(Integer.class, PROP_ID_2, AREA_ID).setAccess(
+                    CarPropertyConfig.VEHICLE_PROPERTY_ACCESS_WRITE).build();
+
+    @Mock
+    private DataSubscriber mMockDataSubscriber;
+
+    @Mock
+    private CarPropertyService mMockCarPropertyService;
+
+    @Captor
+    private ArgumentCaptor<ICarPropertyEventListener> mCarPropertyCallbackCaptor;
+    @Captor
+    private ArgumentCaptor<Bundle> mBundleCaptor;
+
+    private VehiclePropertyPublisher mVehiclePropertyPublisher;
+
+    @Before
+    public void setUp() {
+        when(mMockDataSubscriber.getPublisherParam()).thenReturn(PUBLISHER_PARAMS_1);
+        when(mMockCarPropertyService.getPropertyList())
+                .thenReturn(List.of(PROP_CONFIG_1, PROP_CONFIG_2_WRITE_ONLY));
+        mVehiclePropertyPublisher = new VehiclePropertyPublisher(mMockCarPropertyService);
+    }
+
+    @Test
+    public void testAddDataSubscriber_registersNewCallback() {
+        mVehiclePropertyPublisher.addDataSubscriber(mMockDataSubscriber);
+
+        verify(mMockCarPropertyService).registerListener(eq(PROP_ID_1), eq(PROP_READ_RATE), any());
+        assertThat(mVehiclePropertyPublisher.getDataSubscribers()).hasSize(1);
+    }
+
+    @Test
+    public void testAddDataSubscriber_failsIfInvalidCarProperty() {
+        DataSubscriber invalidDataSubscriber = mock(DataSubscriber.class);
+        when(invalidDataSubscriber.getPublisherParam()).thenReturn(
+                TelemetryProto.Publisher.newBuilder()
+                        .setVehicleProperty(TelemetryProto.VehiclePropertyPublisher.newBuilder()
+                                .setReadRate(PROP_READ_RATE)
+                                .setVehiclePropertyId(PROP_ID_2))
+                        .build());
+
+        Throwable error = assertThrows(IllegalArgumentException.class,
+                () -> mVehiclePropertyPublisher.addDataSubscriber(invalidDataSubscriber));
+
+        assertThat(error).hasMessageThat().contains("No access.");
+        assertThat(mVehiclePropertyPublisher.getDataSubscribers()).isEmpty();
+    }
+
+    @Test
+    public void testAddDataSubscriber_failsIfNoReadAccess() {
+        DataSubscriber invalidDataSubscriber = mock(DataSubscriber.class);
+        when(invalidDataSubscriber.getPublisherParam()).thenReturn(PUBLISHER_PARAMS_INVALID);
+
+        Throwable error = assertThrows(IllegalArgumentException.class,
+                () -> mVehiclePropertyPublisher.addDataSubscriber(invalidDataSubscriber));
+
+        assertThat(error).hasMessageThat().contains("not found");
+        assertThat(mVehiclePropertyPublisher.getDataSubscribers()).isEmpty();
+    }
+
+    @Test
+    public void testRemoveDataSubscriber_succeeds() {
+        mVehiclePropertyPublisher.addDataSubscriber(mMockDataSubscriber);
+
+        mVehiclePropertyPublisher.removeDataSubscriber(mMockDataSubscriber);
+        // TODO(b/189143814): add proper verification
+    }
+
+    @Test
+    public void testRemoveDataSubscriber_failsIfNotFound() {
+        Throwable error = assertThrows(IllegalArgumentException.class,
+                () -> mVehiclePropertyPublisher.removeDataSubscriber(mMockDataSubscriber));
+
+        assertThat(error).hasMessageThat().contains("subscriber not found");
+    }
+
+    @Test
+    public void testRemoveAllDataSubscribers_succeeds() {
+        mVehiclePropertyPublisher.removeAllDataSubscribers();
+        // TODO(b/189143814): add tests
+    }
+
+    @Test
+    public void testOnNewCarPropertyEvent_pushesValueToDataSubscriber() throws Exception {
+        doNothing().when(mMockCarPropertyService).registerListener(
+                anyInt(), anyFloat(), mCarPropertyCallbackCaptor.capture());
+        mVehiclePropertyPublisher.addDataSubscriber(mMockDataSubscriber);
+
+        mCarPropertyCallbackCaptor.getValue().onEvent(Collections.singletonList(PROP_EVENT_1));
+
+        verify(mMockDataSubscriber).push(mBundleCaptor.capture());
+        CarPropertyEvent event = mBundleCaptor.getValue().getParcelable(
+                VehiclePropertyPublisher.CAR_PROPERTY_EVENT_KEY);
+        assertThat(event).isEqualTo(PROP_EVENT_1);
+    }
+}