| #/usr/bin/env python3.4 |
| # |
| # Copyright (C) 2016 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """ |
| This test script is for partial automation of LE devices |
| |
| This script requires these custom parameters in the config file: |
| |
| "ble_mac_address" |
| "service_uuid" |
| "notifiable_char_uuid" |
| """ |
| |
| import pprint |
| from queue import Empty |
| import time |
| |
| from acts.test_utils.bt.bt_constants import ble_scan_settings_modes |
| from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest |
| from acts.test_utils.bt.bt_constants import gatt_cb_err |
| from acts.test_utils.bt.bt_constants import gatt_cb_strings |
| from acts.test_utils.bt.bt_constants import gatt_descriptor |
| from acts.test_utils.bt.bt_constants import gatt_transport |
| from acts.test_utils.bt.bt_gatt_utils import GattTestUtilsError |
| from acts.test_utils.bt.bt_gatt_utils import disconnect_gatt_connection |
| from acts.test_utils.bt.bt_test_utils import generate_ble_scan_objects |
| from acts.test_utils.bt.bt_gatt_utils import setup_gatt_connection |
| from acts.test_utils.bt.bt_gatt_utils import log_gatt_server_uuids |
| from acts.test_utils.bt.bt_test_utils import reset_bluetooth |
| from acts.test_utils.bt.bt_constants import scan_result |
| |
| |
| class GattToolTest(BluetoothBaseTest): |
| AUTOCONNECT = False |
| DEFAULT_TIMEOUT = 10 |
| PAIRING_TIMEOUT = 20 |
| adv_instances = [] |
| timer_list = [] |
| |
| def __init__(self, controllers): |
| BluetoothBaseTest.__init__(self, controllers) |
| # Central role Android device |
| self.cen_ad = self.android_devices[0] |
| self.ble_mac_address = self.user_params['ble_mac_address'] |
| self.SERVICE_UUID = self.user_params['service_uuid'] |
| self.NOTIFIABLE_CHAR_UUID = self.user_params['notifiable_char_uuid'] |
| # CCC == Client Characteristic Configuration |
| self.CCC_DESC_UUID = "00002902-0000-1000-8000-00805f9b34fb" |
| |
| def setup_test(self): |
| super(BluetoothBaseTest, self).setup_test() |
| if not self._is_peripheral_advertising(): |
| input("Press enter when peripheral is advertising...") |
| return True |
| |
| def teardown_test(self): |
| super(BluetoothBaseTest, self).teardown_test() |
| self.log_stats() |
| self.timer_list = [] |
| return True |
| |
| def _pair_with_peripheral(self): |
| self.cen_ad.droid.bluetoothDiscoverAndBond(self.ble_mac_address) |
| end_time = time.time() + self.PAIRING_TIMEOUT |
| self.log.info("Verifying devices are bonded") |
| while time.time() < end_time: |
| bonded_devices = self.cen_ad.droid.bluetoothGetBondedDevices() |
| if self.ble_mac_address in {d['address'] for d in bonded_devices}: |
| self.log.info("Successfully bonded to device") |
| return True |
| return False |
| |
| def _is_peripheral_advertising(self): |
| self.cen_ad.droid.bleSetScanFilterDeviceAddress(self.ble_mac_address) |
| self.cen_ad.droid.bleSetScanSettingsScanMode(ble_scan_settings_modes[ |
| 'low_latency']) |
| filter_list, scan_settings, scan_callback = generate_ble_scan_objects( |
| self.cen_ad.droid) |
| self.cen_ad.droid.bleBuildScanFilter(filter_list) |
| |
| self.cen_ad.droid.bleStartBleScan(filter_list, scan_settings, |
| scan_callback) |
| expected_event_name = scan_result.format(scan_callback) |
| test_result = True |
| try: |
| self.cen_ad.ed.pop_event(expected_event_name, self.DEFAULT_TIMEOUT) |
| self.log.info("Peripheral found with event: {}".format( |
| expected_event_name)) |
| except Empty: |
| self.log.info("Peripheral not advertising or not found: {}".format( |
| self.ble_mac_address)) |
| test_result = False |
| self.cen_ad.droid.bleStopBleScan(scan_callback) |
| return test_result |
| |
| def _unbond_device(self): |
| self.cen_ad.droid.bluetoothUnbond(self.ble_mac_address) |
| time.sleep(2) #Grace timeout for unbonding to finish |
| bonded_devices = self.cen_ad.droid.bluetoothGetBondedDevices() |
| if bonded_devices: |
| self.log.error("Failed to unbond device... found: {}".format( |
| bonded_devices)) |
| return False |
| return True |
| |
| @BluetoothBaseTest.bt_test_wrap |
| def test_gatt_connect_without_scanning(self): |
| """Test the round trip speed of connecting to a peripheral |
| |
| This test will prompt the user to press "Enter" when the |
| peripheral is in a connecable advertisement state. Once |
| the user presses enter, this script will measure the amount |
| of time it takes to establish a GATT connection to the |
| peripheral. The test will then disconnect |
| |
| Steps: |
| 1. Wait for user input to confirm peripheral is advertising. |
| 2. Start timer |
| 3. Perform GATT connection to peripheral |
| 4. Upon successful connection, stop timer |
| 5. Disconnect from peripheral |
| |
| Expected Result: |
| Device should be connected successfully |
| |
| Returns: |
| Pass if True |
| Fail if False |
| |
| TAGS: LE, GATT |
| Priority: 1 |
| """ |
| self.AUTOCONNECT = False |
| start_time = self._get_time_in_milliseconds() |
| try: |
| bluetooth_gatt, gatt_callback = ( |
| setup_gatt_connection(self.cen_ad, self.ble_mac_address, |
| self.AUTOCONNECT, gatt_transport['le'])) |
| except GattTestUtilsError as err: |
| self.log.error(err) |
| return False |
| end_time = self._get_time_in_milliseconds() |
| self.log.info("Total time (ms): {}".format(end_time - start_time)) |
| try: |
| disconnect_gatt_connection(self.cen_ad, bluetooth_gatt, |
| gatt_callback) |
| self.cen_ad.droid.gattClientClose(bluetooth_gatt) |
| except GattTestUtilsError as err: |
| self.log.error(err) |
| return False |
| self.cen_ad.droid.gattClientClose(bluetooth_gatt) |
| |
| @BluetoothBaseTest.bt_test_wrap |
| def test_gatt_connect_stress(self): |
| """Test the round trip speed of connecting to a peripheral many times |
| |
| This test will prompt the user to press "Enter" when the |
| peripheral is in a connecable advertisement state. Once |
| the user presses enter, this script will measure the amount |
| of time it takes to establish a GATT connection to the |
| peripheral. The test will then disconnect. It will attempt to |
| repeat this process multiple times. |
| |
| Steps: |
| 1. Wait for user input to confirm peripheral is advertising. |
| 2. Start timer |
| 3. Perform GATT connection to peripheral |
| 4. Upon successful connection, stop timer |
| 5. Disconnect from peripheral |
| 6. Repeat steps 2-5 1000 times. |
| |
| Expected Result: |
| Test should measure 1000 iterations of connect/disconnect cycles. |
| |
| Returns: |
| Pass if True |
| Fail if False |
| |
| TAGS: LE, GATT |
| Priority: 2 |
| """ |
| filter_list, scan_settings, scan_callback = generate_ble_scan_objects( |
| self.cen_ad.droid) |
| self.cen_ad.droid.bleStartBleScan(filter_list, scan_settings, |
| scan_callback) |
| self.AUTOCONNECT = False |
| iterations = 1000 |
| n = 0 |
| while n < iterations: |
| self.start_timer() |
| try: |
| bluetooth_gatt, gatt_callback = (setup_gatt_connection( |
| self.cen_ad, self.ble_mac_address, self.AUTOCONNECT, |
| gatt_transport['le'])) |
| except GattTestUtilsError as err: |
| self.log.error(err) |
| return False |
| self.log.info("Total time (ms): {}".format(self.end_timer())) |
| try: |
| disconnect_gatt_connection(self.cen_ad, bluetooth_gatt, |
| gatt_callback) |
| self.cen_ad.droid.gattClientClose(bluetooth_gatt) |
| except GattTestUtilsError as err: |
| self.log.error(err) |
| return False |
| n += 1 |
| return True |
| |
| @BluetoothBaseTest.bt_test_wrap |
| def test_gatt_connect_iterate_uuids(self): |
| """Test the discovery of uuids of a peripheral |
| |
| This test will prompt the user to press "Enter" when the |
| peripheral is in a connecable advertisement state. Once |
| the user presses enter, this script connects an Android device |
| to the periphal and attempt to discover all services, |
| characteristics, and descriptors. |
| |
| Steps: |
| 1. Wait for user input to confirm peripheral is advertising. |
| 2. Perform GATT connection to peripheral |
| 3. Upon successful connection, iterate through all services, |
| characteristics, and descriptors. |
| 5. Disconnect from peripheral |
| |
| Expected Result: |
| Device services, characteristics, and descriptors should all |
| be read. |
| |
| Returns: |
| Pass if True |
| Fail if False |
| |
| TAGS: LE, GATT |
| Priority: 2 |
| """ |
| try: |
| bluetooth_gatt, gatt_callback = ( |
| setup_gatt_connection(self.cen_ad, self.ble_mac_address, |
| self.AUTOCONNECT, gatt_transport['le'])) |
| except GattTestUtilsError as err: |
| self.log.error(err) |
| return False |
| if self.cen_ad.droid.gattClientDiscoverServices(bluetooth_gatt): |
| expected_event = gatt_cb_strings['gatt_serv_disc'].format( |
| gatt_callback) |
| try: |
| event = self.cen_ad.ed.pop_event(expected_event, |
| self.DEFAULT_TIMEOUT) |
| discovered_services_index = event['data']['ServicesIndex'] |
| except Empty: |
| self.log.error(gatt_cb_err['gatt_serv_disc'].format( |
| expected_event)) |
| return False |
| log_gatt_server_uuids(self.cen_ad, discovered_services_index) |
| try: |
| disconnect_gatt_connection(self.cen_ad, bluetooth_gatt, |
| gatt_callback) |
| self.cen_ad.droid.gattClientClose(bluetooth_gatt) |
| except GattTestUtilsError as err: |
| self.log.error(err) |
| return False |
| self.cen_ad.droid.gattClientClose(bluetooth_gatt) |
| return True |
| |
| @BluetoothBaseTest.bt_test_wrap |
| def test_pairing(self): |
| """Test pairing to a GATT mac address |
| |
| This test will prompt the user to press "Enter" when the |
| peripheral is in a connecable advertisement state. Once |
| the user presses enter, this script will bond the Android device |
| to the peripheral. |
| |
| Steps: |
| 1. Wait for user input to confirm peripheral is advertising. |
| 2. Perform Bluetooth pairing to GATT mac address |
| 3. Upon successful bonding. |
| 4. Unbond from device |
| |
| Expected Result: |
| Device services, characteristics, and descriptors should all |
| be read. |
| |
| Returns: |
| Pass if True |
| Fail if False |
| |
| TAGS: LE, GATT |
| Priority: 1 |
| """ |
| if not self._pair_with_peripheral(): |
| return False |
| self.cen_ad.droid.bluetoothUnbond(self.ble_mac_address) |
| return self._unbond_device() |
| |
| @BluetoothBaseTest.bt_test_wrap |
| def test_pairing_stress(self): |
| """Test the round trip speed of pairing to a peripheral many times |
| |
| This test will prompt the user to press "Enter" when the |
| peripheral is in a connecable advertisement state. Once |
| the user presses enter, this script will measure the amount |
| of time it takes to establish a pairing with a BLE device. |
| |
| Steps: |
| 1. Wait for user input to confirm peripheral is advertising. |
| 2. Start timer |
| 3. Perform Bluetooth pairing to GATT mac address |
| 4. Upon successful bonding, stop timer. |
| 5. Unbond from device |
| 6. Repeat steps 2-5 100 times. |
| |
| Expected Result: |
| Test should measure 100 iterations of bonding. |
| |
| Returns: |
| Pass if True |
| Fail if False |
| |
| TAGS: LE, GATT |
| Priority: 3 |
| """ |
| iterations = 100 |
| for _ in range(iterations): |
| start_time = self.start_timer() |
| if not self._pair_with_peripheral(): |
| return False |
| self.log.info("Total time (ms): {}".format(self.end_timer())) |
| if not self._unbond_device(): |
| return False |
| return True |
| |
| @BluetoothBaseTest.bt_test_wrap |
| def test_gatt_notification_longev(self): |
| """Test GATT characterisitic notifications for long periods of time |
| |
| This test will prompt the user to press "Enter" when the |
| peripheral is in a connecable advertisement state. Once |
| the user presses enter, this script aims to set characteristic |
| notification to true on the config file's SERVICE_UUID, |
| NOTIFIABLE_CHAR_UUID, and CCC_DESC_UUID. This test assumes |
| the peripheral will constantly write data to a notifiable |
| characteristic. |
| |
| Steps: |
| 1. Wait for user input to confirm peripheral is advertising. |
| 2. Perform Bluetooth pairing to GATT mac address |
| 3. Perform a GATT connection to the periheral |
| 4. Get the discovered service uuid that matches the user's input |
| in the config file |
| 4. Write to the CCC descriptor to enable notifications |
| 5. Enable notifications on the user's input Characteristic UUID |
| 6. Continuously wait for Characteristic Changed events which |
| equate to recieving notifications for 15 minutes. |
| |
| Expected Result: |
| There should be no disconnects and we should constantly receive |
| Characteristic Changed information. Values should vary upon user |
| interaction with the peripheral. |
| |
| Returns: |
| Pass if True |
| Fail if False |
| |
| TAGS: LE, GATT |
| Priority: 1 |
| """ |
| #pair devices |
| if not self._pair_with_peripheral(): |
| return False |
| try: |
| bluetooth_gatt, gatt_callback = ( |
| setup_gatt_connection(self.cen_ad, self.ble_mac_address, |
| self.AUTOCONNECT, gatt_transport['le'])) |
| except GattTestUtilsError as err: |
| self.log.error(err) |
| return False |
| if self.cen_ad.droid.gattClientDiscoverServices(bluetooth_gatt): |
| expected_event = gatt_cb_strings['gatt_serv_disc'].format( |
| gatt_callback) |
| try: |
| event = self.cen_ad.ed.pop_event(expected_event, |
| self.DEFAULT_TIMEOUT) |
| discovered_services_index = event['data']['ServicesIndex'] |
| except Empty: |
| self.log.error(gatt_cb_err['gatt_serv_disc'].format( |
| expected_event)) |
| return False |
| # TODO: in setup save service_cound and discovered_services_index |
| # programatically |
| services_count = self.cen_ad.droid.gattClientGetDiscoveredServicesCount( |
| discovered_services_index) |
| test_service_index = None |
| for i in range(services_count): |
| disc_service_uuid = ( |
| self.cen_ad.droid.gattClientGetDiscoveredServiceUuid( |
| discovered_services_index, i)) |
| if disc_service_uuid == self.SERVICE_UUID: |
| test_service_index = i |
| break |
| if not test_service_index: |
| self.log.error("Service not found.") |
| return False |
| |
| self.cen_ad.droid.gattClientDescriptorSetValue( |
| bluetooth_gatt, discovered_services_index, test_service_index, |
| self.NOTIFIABLE_CHAR_UUID, self.CCC_DESC_UUID, |
| gatt_descriptor['enable_notification_value']) |
| |
| self.cen_ad.droid.gattClientWriteDescriptor( |
| bluetooth_gatt, discovered_services_index, test_service_index, |
| self.NOTIFIABLE_CHAR_UUID, self.CCC_DESC_UUID) |
| |
| self.cen_ad.droid.gattClientSetCharacteristicNotification( |
| bluetooth_gatt, discovered_services_index, test_service_index, |
| self.NOTIFIABLE_CHAR_UUID, True) |
| |
| # set 15 minute notification test time |
| notification_test_time = 900 |
| end_time = time.time() + notification_test_time |
| expected_event = GattCbStrings.CHAR_CHANGE.value.format(gatt_callback) |
| while time.time() < end_time: |
| try: |
| event = self.cen_ad.ed.pop_event(expected_event, |
| self.DEFAULT_TIMEOUT) |
| self.log.info(event) |
| except Empty as err: |
| print(err) |
| self.log.error( |
| GattCbStrings.CHAR_CHANGE_ERR.value.format(expected_event)) |
| return False |
| return True |