Merge "Fix playback state callback crash" into qt-dev
diff --git a/service/src/com/android/car/VmsSubscriberService.java b/service/src/com/android/car/VmsSubscriberService.java
index fc28978..aaadf4f 100644
--- a/service/src/com/android/car/VmsSubscriberService.java
+++ b/service/src/com/android/car/VmsSubscriberService.java
@@ -200,7 +200,7 @@
             VmsHalService hal) {
         mContext = context;
         mBrokerService = brokerService;
-        hal.setVmsSubscriberService(this);
+        hal.setVmsSubscriberService(this, mBrokerService::removeDeadSubscriber);
     }
 
     // Implements CarServiceBase interface.
diff --git a/service/src/com/android/car/hal/VmsHalService.java b/service/src/com/android/car/hal/VmsHalService.java
index 1f6c2e2..487ec1f 100644
--- a/service/src/com/android/car/hal/VmsHalService.java
+++ b/service/src/com/android/car/hal/VmsHalService.java
@@ -40,26 +40,30 @@
 import android.hardware.automotive.vehicle.V2_0.VmsMessageWithLayerIntegerValuesIndex;
 import android.hardware.automotive.vehicle.V2_0.VmsOfferingMessageIntegerValuesIndex;
 import android.hardware.automotive.vehicle.V2_0.VmsPublisherInformationIntegerValuesIndex;
+import android.hardware.automotive.vehicle.V2_0.VmsStartSessionMessageIntegerValuesIndex;
 import android.os.Handler;
 import android.os.HandlerThread;
 import android.os.IBinder;
 import android.os.Message;
 import android.os.RemoteException;
+import android.os.SystemClock;
 import android.util.ArraySet;
 import android.util.Log;
 
-import androidx.annotation.GuardedBy;
 import androidx.annotation.VisibleForTesting;
 
 import com.android.car.CarLog;
 
 import java.io.PrintWriter;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
 
 /**
  * VMS client implementation that proxies VmsPublisher/VmsSubscriber API calls to the Vehicle HAL
@@ -72,18 +76,20 @@
     private static final String TAG = "VmsHalService";
     private static final int HAL_PROPERTY_ID = VehicleProperty.VEHICLE_MAP_SERVICE;
     private static final int NUM_INTEGERS_IN_VMS_LAYER = 3;
+    private static final int UNKNOWN_CLIENT_ID = -1;
 
     private final VehicleHal mVehicleHal;
+    private final int mCoreId;
+    private final MessageQueue mMessageQueue;
     private volatile boolean mIsSupported = false;
 
-    private IBinder mPublisherToken;
     private IVmsPublisherService mPublisherService;
-    private IVmsSubscriberService mSubscriberService;
+    private Consumer<IBinder> mPublisherOnHalConnected;
+    private Runnable mPublisherOnHalDisconnected;
+    private IBinder mPublisherToken;
 
-    @GuardedBy("this")
-    private HandlerThread mHandlerThread;
-    @GuardedBy("this")
-    private Handler mHandler;
+    private IVmsSubscriberService mSubscriberService;
+    private Consumer<IVmsSubscriberClient> mSuscriberOnHalDisconnected;
 
     private int mSubscriptionStateSequence = -1;
     private int mAvailableLayersSequence = -1;
@@ -97,14 +103,19 @@
 
         @Override
         public void onVmsSubscriptionChange(VmsSubscriptionState subscriptionState) {
-            // Registration of this callback is handled by VmsPublisherService.
-            // As a result, HAL support must be checked whenever the callback is triggered.
-            if (!mIsSupported) {
+            if (DBG) Log.d(TAG, "Handling a subscription state change");
+            // Drop out-of-order notifications
+            if (subscriptionState.getSequenceNumber() <= mSubscriptionStateSequence) {
+                Log.w(TAG,
+                        String.format("Out of order subscription state received: %d (expecting %d)",
+                                subscriptionState.getSequenceNumber(),
+                                mSubscriptionStateSequence + 1));
                 return;
             }
-            if (DBG) Log.d(TAG, "Handling a subscription state change");
-            Message.obtain(mHandler, VmsMessageType.SUBSCRIPTIONS_CHANGE, subscriptionState)
-                    .sendToTarget();
+            mSubscriptionStateSequence = subscriptionState.getSequenceNumber();
+            mMessageQueue.enqueue(VmsMessageType.SUBSCRIPTIONS_CHANGE,
+                    createSubscriptionStateMessage(VmsMessageType.SUBSCRIPTIONS_CHANGE,
+                            subscriptionState));
         }
     };
 
@@ -112,67 +123,87 @@
         @Override
         public void onVmsMessageReceived(VmsLayer layer, byte[] payload) {
             if (DBG) Log.d(TAG, "Handling a data message for Layer: " + layer);
-            // TODO(b/124130256): Set publisher ID of data message
-            Message.obtain(mHandler, VmsMessageType.DATA, createDataMessage(layer, 0, payload))
-                    .sendToTarget();
+            mMessageQueue.enqueue(VmsMessageType.DATA, createDataMessage(layer, payload));
         }
 
         @Override
         public void onLayersAvailabilityChanged(VmsAvailableLayers availableLayers) {
             if (DBG) Log.d(TAG, "Handling a layer availability change");
-            Message.obtain(mHandler, VmsMessageType.AVAILABILITY_CHANGE, availableLayers)
-                    .sendToTarget();
+            // Drop out-of-order notifications
+            if (availableLayers.getSequence() <= mAvailableLayersSequence) {
+                Log.w(TAG,
+                        String.format("Out of order layer availability received: %d (expecting %d)",
+                                availableLayers.getSequence(),
+                                mAvailableLayersSequence + 1));
+                return;
+            }
+            mAvailableLayersSequence = availableLayers.getSequence();
+            mMessageQueue.enqueue(VmsMessageType.AVAILABILITY_CHANGE,
+                    createAvailableLayersMessage(VmsMessageType.AVAILABILITY_CHANGE,
+                            availableLayers));
         }
     };
 
-    private final Handler.Callback mHandlerCallback = msg -> {
-        int messageType = msg.what;
-        VehiclePropValue vehicleProp = null;
-        switch (messageType) {
-            case VmsMessageType.DATA:
-                vehicleProp = (VehiclePropValue) msg.obj;
-                break;
-            case VmsMessageType.SUBSCRIPTIONS_CHANGE:
-                VmsSubscriptionState subscriptionState = (VmsSubscriptionState) msg.obj;
-                // Drop out-of-order notifications
-                if (subscriptionState.getSequenceNumber() <= mSubscriptionStateSequence) {
-                    break;
-                }
-                vehicleProp = createSubscriptionStateMessage(
-                        VmsMessageType.SUBSCRIPTIONS_CHANGE,
-                        subscriptionState);
-                mSubscriptionStateSequence = subscriptionState.getSequenceNumber();
-                break;
-            case VmsMessageType.AVAILABILITY_CHANGE:
-                VmsAvailableLayers availableLayers = (VmsAvailableLayers) msg.obj;
-                // Drop out-of-order notifications
-                if (availableLayers.getSequence() <= mAvailableLayersSequence) {
-                    break;
-                }
-                vehicleProp = createAvailableLayersMessage(
-                        VmsMessageType.AVAILABILITY_CHANGE,
-                        availableLayers);
-                mAvailableLayersSequence = availableLayers.getSequence();
-                break;
-            default:
-                Log.e(TAG, "Unexpected message type: " + messageType);
+    private class MessageQueue implements Handler.Callback {
+        private final Set<Integer> mSupportedMessageTypes = new ArraySet<>(Arrays.asList(
+                VmsMessageType.DATA,
+                VmsMessageType.START_SESSION,
+                VmsMessageType.AVAILABILITY_CHANGE,
+                VmsMessageType.SUBSCRIPTIONS_CHANGE
+        ));
+        private HandlerThread mHandlerThread;
+        private Handler mHandler;
+
+        synchronized void init() {
+            mHandlerThread = new HandlerThread(TAG);
+            mHandlerThread.start();
+            mHandler = new Handler(mHandlerThread.getLooper(), this);
         }
-        if (vehicleProp != null) {
+
+        synchronized void release() {
+            if (mHandlerThread != null) {
+                mHandlerThread.quitSafely();
+            }
+        }
+
+        synchronized void enqueue(int messageType, Object message) {
+            if (mSupportedMessageTypes.contains(messageType)) {
+                Message.obtain(mHandler, messageType, message).sendToTarget();
+            } else {
+                Log.e(TAG, "Unexpected message type: " + VmsMessageType.toString(messageType));
+            }
+        }
+
+        synchronized void clear() {
+            mSupportedMessageTypes.forEach(mHandler::removeMessages);
+        }
+
+        @Override
+        public boolean handleMessage(Message msg) {
+            int messageType = msg.what;
+            VehiclePropValue vehicleProp = (VehiclePropValue) msg.obj;
             if (DBG) Log.d(TAG, "Sending " + VmsMessageType.toString(messageType) + " message");
             try {
                 setPropertyValue(vehicleProp);
             } catch (RemoteException e) {
                 Log.e(TAG, "While sending " + VmsMessageType.toString(messageType));
             }
+            return true;
         }
-        return true;
-    };
+    }
 
     /**
      * Constructor used by {@link VehicleHal}
      */
     VmsHalService(VehicleHal vehicleHal) {
+        this(vehicleHal, SystemClock::uptimeMillis);
+    }
+
+    @VisibleForTesting
+    VmsHalService(VehicleHal vehicleHal, Supplier<Long> getCoreId) {
         mVehicleHal = vehicleHal;
+        mCoreId = (int) (getCoreId.get() % Integer.MAX_VALUE);
+        mMessageQueue = new MessageQueue();
     }
 
     /**
@@ -180,21 +211,25 @@
      */
     @VisibleForTesting
     Handler getHandler() {
-        return mHandler;
+        return mMessageQueue.mHandler;
     }
 
     /**
      * Gets the {@link IVmsPublisherClient} implementation for the HAL's publisher callback.
      */
-    public IBinder getPublisherClient() {
-        return mPublisherClient.asBinder();
+    public void setPublisherConnectionCallbacks(Consumer<IBinder> onHalConnected,
+            Runnable onHalDisconnected) {
+        mPublisherOnHalConnected = onHalConnected;
+        mPublisherOnHalDisconnected = onHalDisconnected;
     }
 
     /**
      * Sets a reference to the {@link IVmsSubscriberService} implementation for use by the HAL.
      */
-    public void setVmsSubscriberService(IVmsSubscriberService service) {
+    public void setVmsSubscriberService(IVmsSubscriberService service,
+            Consumer<IVmsSubscriberClient> onHalDisconnected) {
         mSubscriberService = service;
+        mSuscriberOnHalDisconnected = onHalDisconnected;
     }
 
     @Override
@@ -219,39 +254,14 @@
             return; // Do not continue initialization
         }
 
-        synchronized (this) {
-            mHandlerThread = new HandlerThread(TAG);
-            mHandlerThread.start();
-            mHandler = new Handler(mHandlerThread.getLooper(), mHandlerCallback);
-        }
-
-        if (mSubscriberService != null) {
-            try {
-                mSubscriberService.addVmsSubscriberToNotifications(mSubscriberClient);
-            } catch (RemoteException e) {
-                Log.e(TAG, "While adding subscriber callback", e);
-            }
-
-            // Publish layer availability to HAL clients (this triggers HAL client initialization)
-            try {
-                mSubscriberClient.onLayersAvailabilityChanged(
-                        mSubscriberService.getAvailableLayers());
-            } catch (RemoteException e) {
-                Log.e(TAG, "While publishing layer availability", e);
-            }
-        } else if (DBG) {
-            Log.d(TAG, "VmsSubscriberService not registered");
-        }
+        mMessageQueue.init();
+        mMessageQueue.enqueue(VmsMessageType.START_SESSION,
+                createStartSessionMessage(mCoreId, UNKNOWN_CLIENT_ID));
     }
 
     @Override
     public void release() {
-        synchronized (this) {
-            if (mHandlerThread != null) {
-                mHandlerThread.quitSafely();
-            }
-        }
-
+        mMessageQueue.release();
         mSubscriptionStateSequence = -1;
         mAvailableLayersSequence = -1;
 
@@ -330,6 +340,9 @@
                     case VmsMessageType.SUBSCRIPTIONS_REQUEST:
                         handleSubscriptionsRequestEvent();
                         break;
+                    case VmsMessageType.START_SESSION:
+                        handleStartSessionEvent(vec);
+                        break;
                     default:
                         Log.e(TAG, "Unexpected message type: " + messageType);
                 }
@@ -340,6 +353,72 @@
     }
 
     /**
+     * SESSION_START message format:
+     * <ul>
+     * <li>Message type
+     * <li>Core ID
+     * <li>Client ID
+     * </ul>
+     */
+    private void handleStartSessionEvent(List<Integer> message) {
+        int coreId = message.get(VmsStartSessionMessageIntegerValuesIndex.SERVICE_ID);
+        int clientId = message.get(VmsStartSessionMessageIntegerValuesIndex.CLIENT_ID);
+        if (DBG) {
+            Log.d(TAG,
+                    "Handling a session start event with coreId: " + coreId + " client: "
+                            + clientId);
+        }
+
+        if (coreId != mCoreId) {
+            if (mPublisherOnHalDisconnected != null) {
+                mPublisherOnHalDisconnected.run();
+            } else {
+                Log.w(TAG, "Publisher disconnect callback not registered");
+            }
+            if (mSuscriberOnHalDisconnected != null) {
+                mSuscriberOnHalDisconnected.accept(mSubscriberClient);
+            } else {
+                Log.w(TAG, "Subscriber disconnect callback not registered");
+            }
+
+            // Drop all queued messages and client state
+            mMessageQueue.clear();
+            mSubscriptionStateSequence = -1;
+            mAvailableLayersSequence = -1;
+
+            // Enqueue an acknowledgement message
+            mMessageQueue.enqueue(VmsMessageType.START_SESSION,
+                    createStartSessionMessage(mCoreId, clientId));
+        }
+
+        // Notify client manager of connection
+        if (mPublisherOnHalConnected != null) {
+            mPublisherOnHalConnected.accept(mPublisherClient);
+        } else {
+            Log.w(TAG, "Publisher connect callback not registered");
+        }
+
+        // Notify subscriber service of connection
+        if (mSubscriberService != null) {
+            try {
+                mSubscriberService.addVmsSubscriberToNotifications(mSubscriberClient);
+            } catch (RemoteException e) {
+                Log.e(TAG, "While adding subscriber callback", e);
+            }
+
+            // Publish layer availability to HAL clients (this triggers HAL client initialization)
+            try {
+                mSubscriberClient.onLayersAvailabilityChanged(
+                        mSubscriberService.getAvailableLayers());
+            } catch (RemoteException e) {
+                Log.e(TAG, "While publishing layer availability", e);
+            }
+        } else {
+            Log.w(TAG, "Subscriber connect callback not registered");
+        }
+    }
+
+    /**
      * DATA message format:
      * <ul>
      * <li>Message type
@@ -588,6 +667,30 @@
     }
 
     /**
+     * Creates a SESSION_START type {@link VehiclePropValue}.
+     *
+     * SESSION_START message format:
+     * <ul>
+     * <li>Message type
+     * <li>Core ID
+     * <li>Client ID
+     * </ul>
+     */
+    private static VehiclePropValue createStartSessionMessage(int coreId, int clientId) {
+        // Message type + layer
+        VehiclePropValue vehicleProp = createVmsMessage(VmsMessageType.START_SESSION);
+        List<Integer> message = vehicleProp.value.int32Values;
+
+        // Core ID
+        message.add(coreId);
+
+        // Client ID
+        message.add(clientId);
+
+        return vehicleProp;
+    }
+
+    /**
      * Creates a DATA type {@link VehiclePropValue}.
      *
      * DATA message format:
@@ -602,14 +705,15 @@
      *
      * @param layer Layer for which message was published.
      */
-    private static VehiclePropValue createDataMessage(VmsLayer layer, int publisherId,
-            byte[] payload) {
+    private static VehiclePropValue createDataMessage(VmsLayer layer, byte[] payload) {
         // Message type + layer
-        VehiclePropValue vehicleProp = createVmsMessageWithLayer(VmsMessageType.DATA, layer);
+        VehiclePropValue vehicleProp = createVmsMessage(VmsMessageType.DATA);
+        appendLayer(vehicleProp.value.int32Values, layer);
         List<Integer> message = vehicleProp.value.int32Values;
 
         // Publisher ID
-        message.add(publisherId);
+        // TODO(b/124130256): Set publisher ID of data message
+        message.add(0);
 
         // Payload
         appendBytes(vehicleProp.value.bytes, payload);
@@ -709,20 +813,6 @@
     }
 
     /**
-     * Creates a {@link VehiclePropValue} of the requested message type, with layer message fields
-     * populated. Other message fields are *not* populated.
-     *
-     * @param messageType Type of message, from {@link VmsMessageType}
-     * @param layer       Layer affected by message.
-     */
-    private static VehiclePropValue createVmsMessageWithLayer(
-            int messageType, VmsLayer layer) {
-        VehiclePropValue vehicleProp = createVmsMessage(messageType);
-        appendLayer(vehicleProp.value.int32Values, layer);
-        return vehicleProp;
-    }
-
-    /**
      * Appends a {@link VmsLayer} to an encoded VMS message.
      *
      * Layer format:
@@ -761,9 +851,7 @@
         message.add(layer.getVmsLayer().getSubtype());
         message.add(layer.getVmsLayer().getVersion());
         message.add(layer.getPublisherIds().size());
-        for (int publisherId : layer.getPublisherIds()) {
-            message.add(publisherId);
-        }
+        message.addAll(layer.getPublisherIds());
     }
 
     private static void appendBytes(ArrayList<Byte> dst, byte[] src) {
diff --git a/service/src/com/android/car/vms/VmsClientManager.java b/service/src/com/android/car/vms/VmsClientManager.java
index 5f5969e..14ef0a7 100644
--- a/service/src/com/android/car/vms/VmsClientManager.java
+++ b/service/src/com/android/car/vms/VmsClientManager.java
@@ -76,7 +76,6 @@
     private final Handler mHandler;
     private final CarUserService mUserService;
     private final CarUserManagerHelper mUserManagerHelper;
-    private final IBinder mHalClient;
     private final int mMillisBeforeRebind;
 
     @GuardedBy("mListeners")
@@ -84,6 +83,8 @@
     @GuardedBy("mSystemClients")
     private final Map<String, ClientConnection> mSystemClients = new ArrayMap<>();
     @GuardedBy("mSystemClients")
+    private IBinder mHalClient;
+    @GuardedBy("mSystemClients")
     private boolean mSystemUserUnlocked;
 
     @GuardedBy("mCurrentUserClients")
@@ -132,9 +133,9 @@
         mHandler = new Handler(Looper.getMainLooper());
         mUserService = userService;
         mUserManagerHelper = userManagerHelper;
-        mHalClient = halService.getPublisherClient();
         mMillisBeforeRebind = mContext.getResources().getInteger(
                 com.android.car.R.integer.millisecondsBeforeRebindToVmsPublisher);
+        halService.setPublisherConnectionCallbacks(this::onHalConnected, this::onHalDisconnected);
     }
 
     @Override
@@ -273,8 +274,10 @@
     }
 
     private void notifyListenerOfConnectedClients(ConnectionListener listener) {
-        listener.onClientConnected(HAL_CLIENT_NAME, mHalClient);
         synchronized (mSystemClients) {
+            if (mHalClient != null) {
+                listener.onClientConnected(HAL_CLIENT_NAME, mHalClient);
+            }
             mSystemClients.values().forEach(conn -> conn.notifyIfConnected(listener));
         }
         synchronized (mCurrentUserClients) {
@@ -298,6 +301,20 @@
         }
     }
 
+    private void onHalConnected(IBinder halClient) {
+        synchronized (mSystemClients) {
+            mHalClient = halClient;
+            notifyListenersOnClientConnected(HAL_CLIENT_NAME, mHalClient);
+        }
+    }
+
+    private void onHalDisconnected() {
+        synchronized (mSystemClients) {
+            mHalClient = null;
+            notifyListenersOnClientDisconnected(HAL_CLIENT_NAME);
+        }
+    }
+
     class ClientConnection implements ServiceConnection {
         private final ComponentName mName;
         private final UserHandle mUser;
diff --git a/tests/carservice_test/src/com/android/car/MockedVmsTestBase.java b/tests/carservice_test/src/com/android/car/MockedVmsTestBase.java
index 4c26e11..2028958 100644
--- a/tests/carservice_test/src/com/android/car/MockedVmsTestBase.java
+++ b/tests/carservice_test/src/com/android/car/MockedVmsTestBase.java
@@ -16,6 +16,7 @@
 package com.android.car;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import android.car.Car;
 import android.car.VehicleAreaType;
@@ -30,7 +31,9 @@
 import android.hardware.automotive.vehicle.V2_0.VehiclePropertyAccess;
 import android.hardware.automotive.vehicle.V2_0.VehiclePropertyChangeMode;
 import android.hardware.automotive.vehicle.V2_0.VmsAvailabilityStateIntegerValuesIndex;
+import android.hardware.automotive.vehicle.V2_0.VmsBaseMessageIntegerValuesIndex;
 import android.hardware.automotive.vehicle.V2_0.VmsMessageType;
+import android.hardware.automotive.vehicle.V2_0.VmsStartSessionMessageIntegerValuesIndex;
 import android.os.UserHandle;
 import android.util.Log;
 import android.util.Pair;
@@ -86,8 +89,23 @@
         mVmsSubscriberManager.setVmsSubscriberClientCallback(Executors.newSingleThreadExecutor(),
                 mSubscriberClient);
 
-        // Validate layer availability sent to HAL
+        // Validate session handshake
         List<Integer> v = mHalClient.receiveMessage().value.int32Values;
+        assertEquals(VmsMessageType.START_SESSION,
+                (int) v.get(VmsBaseMessageIntegerValuesIndex.MESSAGE_TYPE));
+        int coreId = v.get(VmsStartSessionMessageIntegerValuesIndex.SERVICE_ID);
+        assertTrue(coreId > 0);
+        assertEquals(-1, (int) v.get(VmsStartSessionMessageIntegerValuesIndex.CLIENT_ID));
+
+        // Send handshake acknowledgement
+        mHalClient.sendMessage(
+                VmsMessageType.START_SESSION,
+                coreId,
+                12345 // Client ID
+        );
+
+        // Validate layer availability sent to HAL
+        v = mHalClient.receiveMessage().value.int32Values;
         assertEquals(VmsMessageType.AVAILABILITY_CHANGE,
                 (int) v.get(VmsAvailabilityStateIntegerValuesIndex.MESSAGE_TYPE));
         assertEquals(0,
diff --git a/tests/carservice_unit_test/src/com/android/car/hal/VmsHalServiceTest.java b/tests/carservice_unit_test/src/com/android/car/hal/VmsHalServiceTest.java
index baaab26..f42c52a 100644
--- a/tests/carservice_unit_test/src/com/android/car/hal/VmsHalServiceTest.java
+++ b/tests/carservice_unit_test/src/com/android/car/hal/VmsHalServiceTest.java
@@ -37,14 +37,13 @@
 import android.os.Binder;
 import android.os.IBinder;
 
-import androidx.test.runner.AndroidJUnit4;
-
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnit;
 import org.mockito.junit.MockitoRule;
 
@@ -54,8 +53,8 @@
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
-@RunWith(AndroidJUnit4.class)
 public class VmsHalServiceTest {
     private static final int LAYER_TYPE = 1;
     private static final int LAYER_SUBTYPE = 2;
@@ -64,6 +63,8 @@
     private static final int PUBLISHER_ID = 12345;
     private static final byte[] PAYLOAD = new byte[]{1, 2, 3, 4};
     private static final List<Byte> PAYLOAD_AS_LIST = Arrays.asList(new Byte[]{1, 2, 3, 4});
+    private static final int CORE_ID = 54321;
+    private static final int CLIENT_ID = 98765;
 
     @Rule
     public MockitoRule mockito = MockitoJUnit.rule();
@@ -73,6 +74,12 @@
     private IVmsPublisherService mPublisherService;
     @Mock
     private IVmsSubscriberService mSubscriberService;
+    @Mock
+    private Consumer<IBinder> mPublisherOnHalConnected;
+    @Mock
+    private Runnable mPublisherOnHalDisconnected;
+    @Mock
+    private Consumer<IVmsSubscriberClient> mSubscriberOnHalDisconnected;
 
     private IBinder mToken;
     private VmsHalService mHalService;
@@ -81,12 +88,10 @@
 
     @Before
     public void setUp() throws Exception {
-        mHalService = new VmsHalService(mVehicleHal);
-        mHalService.setVmsSubscriberService(mSubscriberService);
-
-        mToken = new Binder();
-        mPublisherClient = IVmsPublisherClient.Stub.asInterface(mHalService.getPublisherClient());
-        mPublisherClient.setVmsPublisherService(mToken, mPublisherService);
+        mHalService = new VmsHalService(mVehicleHal, () -> (long) CORE_ID);
+        mHalService.setPublisherConnectionCallbacks(
+                mPublisherOnHalConnected, mPublisherOnHalDisconnected);
+        mHalService.setVmsSubscriberService(mSubscriberService, mSubscriberOnHalDisconnected);
 
         VehiclePropConfig propConfig = new VehiclePropConfig();
         propConfig.prop = VehicleProperty.VEHICLE_MAP_SERVICE;
@@ -97,16 +102,68 @@
         mHalService.init();
         waitForHandlerCompletion();
 
+        // Verify START_SESSION message was sent
+        InOrder initOrder =
+                Mockito.inOrder(mPublisherOnHalConnected, mSubscriberService, mVehicleHal);
+        initOrder.verify(mVehicleHal).subscribeProperty(mHalService,
+                VehicleProperty.VEHICLE_MAP_SERVICE);
+        initOrder.verify(mVehicleHal).set(createHalMessage(
+                VmsMessageType.START_SESSION, // Message type
+                CORE_ID,                      // Core ID
+                -1));                          // Client ID (unknown)
+
+        // Verify no more interections until handshake received
+        initOrder.verifyNoMoreInteractions();
+
+        // Send START_SESSION response from client
+        sendHalMessage(createHalMessage(
+                VmsMessageType.START_SESSION,  // Message type
+                0,                             // Core ID (unknown)
+                CLIENT_ID                      // Client ID
+        ));
+        waitForHandlerCompletion();
+
+        // Verify client is marked as connected
+        ArgumentCaptor<IBinder> publisherCaptor = ArgumentCaptor.forClass(IBinder.class);
+        initOrder.verify(mPublisherOnHalConnected).accept(publisherCaptor.capture());
+        mPublisherClient = IVmsPublisherClient.Stub.asInterface(publisherCaptor.getValue());
+
+        mToken = new Binder();
+        mPublisherClient.setVmsPublisherService(mToken, mPublisherService);
+
         ArgumentCaptor<IVmsSubscriberClient> subscriberCaptor = ArgumentCaptor.forClass(
                 IVmsSubscriberClient.class);
-        verify(mSubscriberService).addVmsSubscriberToNotifications(subscriberCaptor.capture());
+        initOrder.verify(mSubscriberService).addVmsSubscriberToNotifications(
+                subscriberCaptor.capture());
         mSubscriberClient = subscriberCaptor.getValue();
-        reset(mSubscriberService);
-        verify(mVehicleHal).set(createHalMessage(
+
+        initOrder.verify(mSubscriberService).getAvailableLayers();
+        initOrder.verify(mVehicleHal).set(createHalMessage(
                 VmsMessageType.AVAILABILITY_CHANGE, // Message type
                 0,                                  // Sequence number
                 0));                                // # of associated layers
-        reset(mVehicleHal);
+
+        initOrder.verifyNoMoreInteractions();
+        reset(mPublisherOnHalConnected, mSubscriberService, mVehicleHal);
+    }
+
+    @Test
+    public void testCoreId_IntegerOverflow() throws Exception {
+        mHalService = new VmsHalService(mVehicleHal, () -> (long) Integer.MAX_VALUE + CORE_ID);
+
+        VehiclePropConfig propConfig = new VehiclePropConfig();
+        propConfig.prop = VehicleProperty.VEHICLE_MAP_SERVICE;
+        mHalService.takeSupportedProperties(Collections.singleton(propConfig));
+
+        when(mSubscriberService.getAvailableLayers()).thenReturn(
+                new VmsAvailableLayers(Collections.emptySet(), 0));
+        mHalService.init();
+        waitForHandlerCompletion();
+
+        verify(mVehicleHal).set(createHalMessage(
+                VmsMessageType.START_SESSION, // Message type
+                CORE_ID,                      // Core ID
+                -1));                          // Client ID (unknown)
     }
 
     @Test
@@ -506,6 +563,40 @@
     }
 
     /**
+     * START_SESSION message format:
+     * <ul>
+     * <li>Message type
+     * <li>Core ID
+     * <li>Client ID
+     * </ul>
+     */
+    @Test
+    public void testHandleStartSessionEvent() throws Exception {
+        when(mSubscriberService.getAvailableLayers()).thenReturn(
+                new VmsAvailableLayers(Collections.emptySet(), 5));
+
+        VehiclePropValue request = createHalMessage(
+                VmsMessageType.START_SESSION,  // Message type
+                0,                             // Core ID (unknown)
+                CLIENT_ID                      // Client ID
+        );
+
+        VehiclePropValue response = createHalMessage(
+                VmsMessageType.START_SESSION,  // Message type
+                CORE_ID,                               // Core ID
+                CLIENT_ID                              // Client ID
+        );
+
+        sendHalMessage(request);
+        InOrder inOrder = Mockito.inOrder(mVehicleHal);
+        inOrder.verify(mVehicleHal).set(response);
+        inOrder.verify(mVehicleHal).set(createHalMessage(
+                VmsMessageType.AVAILABILITY_CHANGE, // Message type
+                5,                                  // Sequence number
+                0));                                // # of associated layers
+    }
+
+    /**
      * AVAILABILITY_CHANGE message format:
      * <ul>
      * <li>Message type
@@ -865,9 +956,7 @@
 
     private void waitForHandlerCompletion() throws Exception {
         final CountDownLatch latch = new CountDownLatch(1);
-        mHalService.getHandler().post(() -> {
-            latch.countDown();
-        });
+        mHalService.getHandler().post(latch::countDown);
         latch.await(5, TimeUnit.SECONDS);
     }
 }
diff --git a/tests/carservice_unit_test/src/com/android/car/vms/VmsClientManagerTest.java b/tests/carservice_unit_test/src/com/android/car/vms/VmsClientManagerTest.java
index 6519f0c..772abc7 100644
--- a/tests/carservice_unit_test/src/com/android/car/vms/VmsClientManagerTest.java
+++ b/tests/carservice_unit_test/src/com/android/car/vms/VmsClientManagerTest.java
@@ -60,6 +60,8 @@
 import org.mockito.junit.MockitoJUnit;
 import org.mockito.junit.MockitoRule;
 
+import java.util.function.Consumer;
+
 @SmallTest
 public class VmsClientManagerTest {
     private static final String HAL_CLIENT_NAME = "VmsHalClient";
@@ -84,7 +86,8 @@
 
     @Mock
     private VmsHalService mHal;
-    private IBinder mHalClient;
+    private Consumer<IBinder> mHalClientConnected;
+    private Runnable mHalClientDisconnected;
 
     @Mock
     private VmsClientManager.ConnectionListener mConnectionListener;
@@ -115,13 +118,18 @@
         mUserId = 10;
         when(mUserManager.getCurrentForegroundUserId()).thenAnswer((invocation) -> mUserId);
 
-        mHalClient = new Binder();
-        when(mHal.getPublisherClient()).thenReturn(mHalClient);
-
         mClientManager = new VmsClientManager(mContext, mUserService, mUserManager, mHal);
         mClientManager.registerConnectionListener(mConnectionListener);
-        verify(mConnectionListener).onClientConnected(HAL_CLIENT_NAME, mHalClient);
-        reset(mConnectionListener);
+
+        @SuppressWarnings("unchecked")
+        ArgumentCaptor<Consumer<IBinder>> onClientConnectedCaptor =
+                ArgumentCaptor.forClass(Consumer.class);
+        ArgumentCaptor<Runnable> onClientDisconnectedCaptor =
+                ArgumentCaptor.forClass(Runnable.class);
+        verify(mHal).setPublisherConnectionCallbacks(
+                onClientConnectedCaptor.capture(), onClientDisconnectedCaptor.capture());
+        mHalClientConnected = onClientConnectedCaptor.getValue();
+        mHalClientDisconnected = onClientDisconnectedCaptor.getValue();
     }
 
     @After
@@ -130,6 +138,7 @@
         verify(mContext, atLeast(0)).getResources();
         verify(mContext, atLeast(0)).getPackageManager();
         verifyNoMoreInteractions(mContext);
+        verifyNoMoreInteractions(mHal);
     }
 
     @Test
@@ -162,18 +171,28 @@
         VmsClientManager.ConnectionListener listener =
                 Mockito.mock(VmsClientManager.ConnectionListener.class);
         mClientManager.registerConnectionListener(listener);
-        verify(listener).onClientConnected(HAL_CLIENT_NAME, mHalClient);
+    }
+
+    @Test
+    public void testRegisterConnectionListener_AfterHalClientConnected() {
+        IBinder halClient = bindHalClient();
+
+        VmsClientManager.ConnectionListener listener =
+                Mockito.mock(VmsClientManager.ConnectionListener.class);
+        mClientManager.registerConnectionListener(listener);
+        verify(listener).onClientConnected(HAL_CLIENT_NAME, halClient);
     }
 
     @Test
     public void testRegisterConnectionListener_AfterClientsConnected() {
+        IBinder halClient = bindHalClient();
         IBinder systemBinder = bindSystemClient();
         IBinder userBinder = bindUserClient();
 
         VmsClientManager.ConnectionListener listener =
                 Mockito.mock(VmsClientManager.ConnectionListener.class);
         mClientManager.registerConnectionListener(listener);
-        verify(listener).onClientConnected(HAL_CLIENT_NAME, mHalClient);
+        verify(listener).onClientConnected(HAL_CLIENT_NAME, halClient);
         verify(listener).onClientConnected(eq(SYSTEM_CLIENT_NAME), eq(systemBinder));
         verify(listener).onClientConnected(eq(USER_CLIENT_NAME), eq(userBinder));
     }
@@ -334,6 +353,18 @@
     }
 
     @Test
+    public void testHalClientConnected() {
+        IBinder binder = bindHalClient();
+        verify(mConnectionListener).onClientConnected(eq(HAL_CLIENT_NAME), eq(binder));
+    }
+
+    private IBinder bindHalClient() {
+        IBinder binder = new Binder();
+        mHalClientConnected.accept(binder);
+        return binder;
+    }
+
+    @Test
     public void testOnSystemServiceConnected() {
         IBinder binder = bindSystemClient();
         verify(mConnectionListener).onClientConnected(eq(SYSTEM_CLIENT_NAME), eq(binder));
@@ -368,6 +399,14 @@
     }
 
     @Test
+    public void testOnHalClientDisconnected() throws Exception {
+        bindHalClient();
+        mHalClientDisconnected.run();
+
+        verify(mConnectionListener).onClientDisconnected(eq(HAL_CLIENT_NAME));
+    }
+
+    @Test
     public void testOnSystemServiceDisconnected() throws Exception {
         notifySystemUserUnlocked();
         verifySystemBind(1);