PBAP PCE multiconnect tests

Test PBAP client when connecting multiple phones simultaneously.  This
test loads contacts onto two phones, pairs them and subsequently
connects PBAP and verifies contacts are transfered correctly.

Bug: 33075066
Test: This is the test.
Change-Id: I55bb393eea22e4d95ba3e62f89a44b34400b3c70
diff --git a/acts/tests/google/bt/car_bt/BtCarPbapTest.py b/acts/tests/google/bt/car_bt/BtCarPbapTest.py
index dc98968..bacf4c4 100644
--- a/acts/tests/google/bt/car_bt/BtCarPbapTest.py
+++ b/acts/tests/google/bt/car_bt/BtCarPbapTest.py
@@ -20,6 +20,7 @@
 import time
 
 from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
+from acts.test_utils.bt.bt_test_utils import setup_multiple_devices_for_bt_test
 from acts.base_test import BaseTestClass
 from acts.test_utils.bt import bt_contacts_utils
 from acts.test_utils.bt import bt_test_utils
@@ -31,6 +32,8 @@
 CALL_LOG_TIME_OFFSET_IN_MSEC = 60000
 PSE_CONTACTS_FILE = "psecontacts.vcf"
 PCE_CONTACTS_FILE = "pcecontacts.vcf"
+MERGED_CONTACTS_FILE = "psecombined.vcf"
+STANDART_CONTACT_COUNT = 100
 
 
 class BtCarPbapTest(BluetoothBaseTest):
@@ -40,46 +43,61 @@
         BaseTestClass.__init__(self, controllers)
         self.pce = self.android_devices[0]
         self.pse = self.android_devices[1]
+        self.pse2 = self.android_devices[2]
         self.contacts_destination_path = self.log_path + "/"
 
     def setup_class(self):
         if not super(BtCarPbapTest, self).setup_class():
             return False
-        permissions_list = ["android.permission.READ_CONTACTS",
-                            "android.permission.WRITE_CONTACTS",
-                            "android.permission.READ_EXTERNAL_STORAGE"]
+        permissions_list = [
+            "android.permission.READ_CONTACTS",
+            "android.permission.WRITE_CONTACTS",
+            "android.permission.READ_EXTERNAL_STORAGE"
+        ]
         for permission in permissions_list:
             self.pse.adb.shell(
                 "pm grant com.google.android.contacts {}".format(permission))
         for permission in permissions_list:
-            self.pce.adb.shell(
-                "pm grant com.android.contacts {}".format(permission))
-
+            self.pce.adb.shell("pm grant com.android.contacts {}".format(
+                permission))
 
         # Pair the devices.
         # This call may block until some specified timeout in bt_test_utils.py.
         # Grace time inbetween stack state changes
 
-        time.sleep(5)
-        if not bt_test_utils.pair_pri_to_sec(
-                self.pce, self.pse, attempts=4, auto_confirm=False):
-            self.log.error("Failed to pair devices.")
+        setup_multiple_devices_for_bt_test(self.android_devices)
+        if not bt_test_utils.pair_pri_to_sec(self.pce, self.pse):
+            self.log.error("Failed to pair.")
+            return False
+        time.sleep(3)
+        if not bt_test_utils.pair_pri_to_sec(self.pce, self.pse2):
+            self.log.error("Failed to pair.")
             return False
 
         # Disable the HFP and A2DP profiles. This will ensure only PBAP
         # gets connected. Also, this will eliminate the auto-connect loop.
         car_bt_utils.set_car_profile_priorities_off(self.pce, self.pse)
+        car_bt_utils.set_car_profile_priorities_off(self.pce, self.pse2)
 
         # Enable PBAP on PSE & PCE.
-        bt_test_utils.set_profile_priority(
-            self.pce, self.pse, [BtEnum.BluetoothProfile.PBAP_CLIENT],
-            BtEnum.BluetoothPriorityLevel.PRIORITY_ON)
 
         self.pse.droid.bluetoothChangeProfileAccessPermission(
             self.pce.droid.bluetoothGetLocalAddress(),
             BtEnum.BluetoothProfile.PBAP_SERVER.value,
             BtEnum.BluetoothAccessLevel.ACCESS_ALLOWED.value)
 
+        self.pse2.droid.bluetoothChangeProfileAccessPermission(
+            self.pce.droid.bluetoothGetLocalAddress(),
+            BtEnum.BluetoothProfile.PBAP_SERVER.value,
+            BtEnum.BluetoothAccessLevel.ACCESS_ALLOWED.value)
+
+        bt_test_utils.set_profile_priority(
+            self.pce, self.pse, [BtEnum.BluetoothProfile.PBAP_CLIENT],
+            BtEnum.BluetoothPriorityLevel.PRIORITY_ON)
+        bt_test_utils.set_profile_priority(
+            self.pce, self.pse2, [BtEnum.BluetoothProfile.PBAP_CLIENT],
+            BtEnum.BluetoothPriorityLevel.PRIORITY_ON)
+
         return True
 
     def setup_test(self):
@@ -150,8 +168,7 @@
         if bt_test_utils.connect_pri_to_sec(
                 self.pce, self.pse,
                 set([BtEnum.BluetoothProfile.PBAP_CLIENT.value])):
-            self.pce.log.error(
-                "Client connected. Expected to not be connected.")
+            self.log.error("Client connected and shouldn't be.")
             return False
 
         self.pce.droid.bluetoothPbapClientDisconnect(
@@ -162,10 +179,10 @@
             BtEnum.BluetoothProfile.PBAP_SERVER.value,
             BtEnum.BluetoothAccessLevel.ACCESS_ALLOWED.value)
 
-        if not bt_test_utils.connect_pri_to_sec(self.pce, self.pse, set(
-            [BtEnum.BluetoothProfile.PBAP_CLIENT.value])):
-            self.pce.log.error(
-                "No PBAP client connected. Expected to be connected.")
+        if not bt_test_utils.connect_pri_to_sec(
+                self.pce, self.pse,
+                set([BtEnum.BluetoothProfile.PBAP_CLIENT.value])):
+            self.log.error("No client connected and should be.")
             return False
 
         return True
@@ -356,11 +373,16 @@
             bt_contacts_utils.generate_random_phone_number().phone_number,
             int(time.time()) * 1000 - 2 * CALL_LOG_TIME_OFFSET_IN_MSEC)
 
+        self.pce.droid.bluetoothPbapClientDisconnect(
+            self.pse.droid.bluetoothGetLocalAddress())
+        self.pce.droid.bluetoothPbapClientDisconnect(
+            self.pse2.droid.bluetoothGetLocalAddress())
+
         bt_test_utils.connect_pri_to_sec(
             self.pce, self.pse,
             set([BtEnum.BluetoothProfile.PBAP_CLIENT.value]))
         pse_call_log_count = self.pse.droid.callLogGetCount()
-        self.pse.log.info("Waiting for {} call logs to be transfered".format(
+        self.log.info("Waiting for {} call logs to be transfered".format(
             pse_call_log_count))
         bt_contacts_utils.wait_for_call_log_update_complete(self.pce,
                                                             pse_call_log_count)
@@ -376,3 +398,90 @@
             return False
 
         return True
+
+    def test_multiple_phones(self):
+        """Test Multiple Phones
+
+        Test that connects two phones and confirms contacts are transfered
+        and merged while still being associated with their original phone.
+
+        Precondition:
+        1. Devices are paired.
+
+        Steps:
+        1. Add a unique list of contacts to PSE on each phone.
+        2. Connect PCE to PSE 1 to perform transfer.
+        3. Verify contacts match.
+        4. Connect PCE to PSE 2 to perform transfer.
+        5. Verify that the PCE has a union set of contacts from
+           PSE 1 and PSE 2.
+        6. Disconnect PCE from PSE 1 to clean up contacts.
+        7. Verify that only PSE 2 contacts remain on PCE and they match.
+        8. Disconnect PCE from PSE 2 to clean up contacts.
+
+        Returns:
+           Pass if True
+           Fail if False
+        """
+
+        PSE1_CONTACTS_FILE = "{}{}".format(PSE_CONTACTS_FILE, "1")
+        PSE2_CONTACTS_FILE = "{}{}".format(PSE_CONTACTS_FILE, "2")
+
+        bt_contacts_utils.generate_contact_list(self.contacts_destination_path,
+                                                PSE1_CONTACTS_FILE, 100)
+        phone_numbers_added = bt_contacts_utils.import_device_contacts_from_vcf(
+            self.pse, self.contacts_destination_path, PSE1_CONTACTS_FILE)
+        bt_contacts_utils.generate_contact_list(self.contacts_destination_path,
+                                                PSE2_CONTACTS_FILE, 100)
+        phone_numbers_added = bt_contacts_utils.import_device_contacts_from_vcf(
+            self.pse2, self.contacts_destination_path, PSE2_CONTACTS_FILE)
+
+        self.pce.droid.bluetoothPbapClientDisconnect(
+            self.pse.droid.bluetoothGetLocalAddress())
+        self.pce.droid.bluetoothPbapClientDisconnect(
+            self.pse2.droid.bluetoothGetLocalAddress())
+
+        bt_test_utils.connect_pri_to_sec(
+            self.pce, self.pse,
+            set([BtEnum.BluetoothProfile.PBAP_CLIENT.value]))
+        bt_contacts_utils.wait_for_phone_number_update_complete(self.pce, 100)
+        bt_contacts_utils.export_device_contacts_to_vcf(
+            self.pce, self.contacts_destination_path, PCE_CONTACTS_FILE)
+        pse1_matches = bt_contacts_utils.count_contacts_with_differences(
+            self.contacts_destination_path, PCE_CONTACTS_FILE,
+            PSE1_CONTACTS_FILE) == 0
+
+        bt_test_utils.connect_pri_to_sec(
+            self.pce, self.pse2,
+            set([BtEnum.BluetoothProfile.PBAP_CLIENT.value]))
+
+        bt_contacts_utils.wait_for_phone_number_update_complete(self.pce, 200)
+
+        bt_contacts_utils.export_device_contacts_to_vcf(
+            self.pce, self.contacts_destination_path, PCE_CONTACTS_FILE)
+
+        merged_file = open('{}{}'.format(self.contacts_destination_path,
+                                         MERGED_CONTACTS_FILE), 'w')
+        for contacts_file in [PSE1_CONTACTS_FILE, PSE2_CONTACTS_FILE]:
+            infile = open(self.contacts_destination_path + contacts_file)
+            merged_file.write(infile.read())
+
+        self.log.info("Checking combined phonebook.")
+        pse1andpse2_matches = bt_contacts_utils.count_contacts_with_differences(
+            self.contacts_destination_path, PCE_CONTACTS_FILE,
+            MERGED_CONTACTS_FILE) == 0
+
+        self.pce.droid.bluetoothPbapClientDisconnect(
+            self.pse.droid.bluetoothGetLocalAddress())
+        bt_contacts_utils.wait_for_phone_number_update_complete(self.pce, 100)
+
+        self.log.info("Checking phonebook after disconnecting first device.")
+        bt_contacts_utils.export_device_contacts_to_vcf(
+            self.pce, self.contacts_destination_path, PCE_CONTACTS_FILE)
+        pse2_matches = bt_contacts_utils.count_contacts_with_differences(
+            self.contacts_destination_path, PCE_CONTACTS_FILE,
+            PSE2_CONTACTS_FILE) == 0
+
+        bt_contacts_utils.erase_contacts(self.pse)
+        bt_contacts_utils.erase_contacts(self.pse2)
+        return pse1_matches and pse2_matches and pse1andpse2_matches