Add logging and improve tests
Improve robustness of RFCOMM tests.
Add new Gatt connection test for autoconnect.
Add new logging to get more info.
Change-Id: Id83df2c2890e3e4f267186435332fc318e48f7e5
(cherry picked from commit e3170f0f3cf4faae77ccba8b3b8d66df0680ab65)
diff --git a/acts/framework/acts/test_utils/bt/BtEnum.py b/acts/framework/acts/test_utils/bt/BtEnum.py
index f6939f3..0f69037 100644
--- a/acts/framework/acts/test_utils/bt/BtEnum.py
+++ b/acts/framework/acts/test_utils/bt/BtEnum.py
@@ -32,3 +32,6 @@
STATE_BLE_TURNING_ON = 14
STATE_BLE_ON = 15
STATE_BLE_TURNING_OFF = 16
+
+class RfcommUuid(Enum):
+ DEFAULT_UUID = "457807c0-4897-11df-9879-0800200c9a66"
diff --git a/acts/framework/acts/test_utils/bt/bt_gatt_utils.py b/acts/framework/acts/test_utils/bt/bt_gatt_utils.py
index d6f7bf1..e95833d 100644
--- a/acts/framework/acts/test_utils/bt/bt_gatt_utils.py
+++ b/acts/framework/acts/test_utils/bt/bt_gatt_utils.py
@@ -64,7 +64,11 @@
return True
-def orchestrate_gatt_connection(cen_ad, per_ad, le=True, mac_address=None):
+def orchestrate_gatt_connection(cen_ad,
+ per_ad,
+ le=True,
+ mac_address=None,
+ autoconnect=False):
adv_callback = None
if mac_address is None:
if le:
@@ -73,7 +77,6 @@
else:
mac_address = per_ad.droid.bluetoothGetLocalAddress()
adv_callback = None
- autoconnect = False
test_result, bluetooth_gatt, gatt_callback = setup_gatt_connection(
cen_ad, mac_address, autoconnect)
if not test_result:
diff --git a/acts/framework/acts/test_utils/bt/bt_test_utils.py b/acts/framework/acts/test_utils/bt/bt_test_utils.py
index 2266f77..7164a8d 100644
--- a/acts/framework/acts/test_utils/bt/bt_test_utils.py
+++ b/acts/framework/acts/test_utils/bt/bt_test_utils.py
@@ -22,6 +22,7 @@
import time
from contextlib2 import suppress
+from subprocess import call
from acts.logger import LoggerProxy
from acts.test_utils.bt.BleEnum import AdvertiseSettingsAdvertiseMode
@@ -37,6 +38,7 @@
from acts.test_utils.bt.BleEnum import ScanSettingsScanResultType
from acts.test_utils.bt.BleEnum import ScanSettingsScanMode
from acts.test_utils.bt.BtEnum import BluetoothScanModeType
+from acts.test_utils.bt.BtEnum import RfcommUuid
from acts.utils import exe_cmd
default_timeout = 15
@@ -170,6 +172,7 @@
d = a.droid
setup_result = d.bluetoothSetLocalName(generate_id_by_size(4))
if not setup_result:
+ log.error("Failed to set device name.")
return setup_result
d.bluetoothDisableBLE()
bonded_devices = d.bluetoothGetBondedDevices()
@@ -178,8 +181,10 @@
for a in android_devices:
setup_result = a.droid.bluetoothConfigHciSnoopLog(True)
if not setup_result:
+ log.error("Failed to enable Bluetooth Hci Snoop Logging.")
return setup_result
- except Exception:
+ except Exception as err:
+ log.error("Something went wrong in multi device setup: {}".format(err))
return False
return setup_result
@@ -256,12 +261,18 @@
try:
for scan_callback in scan_callback_list:
scan_droid.bleStopBleScan(scan_callback)
- except Exception:
+ except Exception as err:
+ log.debug(
+ "Failed to stop LE scan... reseting Bluetooth. Error {}".format(
+ err))
reset_bluetooth([scn_android_device])
try:
for adv_callback in adv_callback_list:
adv_droid.bleStopBleAdvertising(adv_callback)
- except Exception:
+ except Exception as err:
+ log.debug(
+ "Failed to stop LE advertisement... reseting Bluetooth. Error {}".format(
+ err))
reset_bluetooth([adv_android_device])
@@ -368,10 +379,8 @@
return_string = "{} Energy info collection:\n".format(state)
for d in droids:
with suppress(Exception):
- if (d.getBuildModel() == "Nexus 6" or
- d.getBuildModel() == "Nexus 9" or
- d.getBuildModel() == "Nexus 6P" or
- d.getBuildModel() == "Nexus5X"):
+ if (d.getBuildModel() != "Nexus 5" or
+ d.getBuildModel() != "Nexus 4"):
description = ("Device: {}\tEnergyStatus: {}\n".format(
d.getBuildSerial(),
@@ -384,49 +393,32 @@
# Enable discovery on sec_droid so that pri_droid can find it.
# The timeout here is based on how much time it would take for two devices
# to pair with each other once pri_droid starts seeing devices.
- self.log.info("Bonding device {} to {}".format(
- pri_droid.bluetoothGetLocalAddress, sec_droid.bluetoothGetLocalAddress))
+ log.info(
+ "Bonding device {} to {}".format(pri_droid.bluetoothGetLocalAddress(),
+ sec_droid.bluetoothGetLocalAddress()))
sec_droid.bluetoothMakeDiscoverable(default_timeout)
target_address = sec_droid.bluetoothGetLocalAddress()
+ log.debug("Starting paring helper on each device")
pri_droid.bluetoothStartPairingHelper()
sec_droid.bluetoothStartPairingHelper()
+ log.info("Primary device starting discovery and executing bond")
result = pri_droid.bluetoothDiscoverAndBond(target_address)
# Loop until we have bonded successfully or timeout.
end_time = time.time() + default_timeout
+ log.info("Verifying devices are bonded")
while time.time() < end_time:
bonded_devices = pri_droid.bluetoothGetBondedDevices()
bonded = False
for d in bonded_devices:
if d['address'] == target_address:
+ log.info("Successfully bonded to device")
return True
time.sleep(1)
# Timed out trying to bond.
+ log.debug("Failed to bond devices.")
return False
-def get_bt_mac_address(droid, droid1, make_undisocverable=True):
- droid1.bluetoothMakeDiscoverable(default_timeout)
- droid.bluetoothStartDiscovery()
- mac = ""
- target_name = droid1.bluetoothGetLocalAddress()
- time.sleep(default_discovery_timeout)
- discovered_devices = droid.bluetoothGetDiscoveredDevices()
- for device in discovered_devices:
- if 'name' in device.keys() and target_name == device['name']:
- mac = device['address']
- continue
- if make_undisocverable:
- droid1.bluetoothMakeUndiscoverable()
- droid.bluetoothCancelDiscovery()
- if mac == "":
- return False
- return mac
-
-
-def get_client_server_bt_mac_address(droid, droid1):
- return get_bt_mac_address(droid, droid1), get_bt_mac_address(droid1, droid)
-
-
def take_btsnoop_logs(android_devices, testcase, testname):
for a in android_devices:
take_btsnoop_log(a.droid, testcase, testname)
@@ -456,9 +448,61 @@
exe_cmd(cmd)
-def rfcomm_connect(droid, device_address):
- droid.bluetoothRfcommConnect(device_address)
+def kill_bluetooth_process(ad):
+ log.info("Killing Bluetooth process.")
+ pid = ad.adb.shell(
+ "ps | grep com.android.bluetooth | awk '{print $2}'").decode('ascii')
+ call(["adb -s " + ad.serial + " shell kill " + pid],
+ shell=True)
-def rfcomm_accept(droid):
- droid.bluetoothRfcommAccept()
+def rfcomm_connect(ad, device_address):
+ rf_client_ad = ad
+ log.debug("Performing RFCOMM connection to {}".format(device_address))
+ try:
+ ad.droid.bluetoothRfcommConnect(device_address)
+ except Exception as err:
+ log.error("Failed to connect: {}".format(err))
+ ad.droid.bluetoothRfcommCloseSocket()
+ return
+ return
+
+
+def rfcomm_accept(ad):
+ rf_server_ad = ad
+ log.debug("Performing RFCOMM accept")
+ try:
+ ad.droid.bluetoothRfcommAccept(RfcommUuid.DEFAULT_UUID.value,
+ default_timeout)
+ except Exception as err:
+ log.error("Failed to accept: {}".format(err))
+ ad.droid.bluetoothRfcommCloseSocket()
+ return
+ return
+
+
+def write_read_verify_data(client_ad, server_ad, msg, binary=False):
+ log.info("Write message.")
+ try:
+ if binary:
+ client_ad.droid.bluetoothRfcommWriteBinary(msg)
+ else:
+ client_ad.droid.bluetoothRfcommWrite(msg)
+ except Exception as err:
+ log.error("Failed to write data: {}".format(err))
+ return False
+ log.info("Read message.")
+ try:
+ if binary:
+ read_msg = server_ad.droid.bluetoothRfcommReadBinary().rstrip("\r\n")
+ else:
+ read_msg = server_ad.droid.bluetoothRfcommRead()
+ except Exception as err:
+ log.error("Failed to read data: {}".format(err))
+ return False
+ log.info("Verify message.")
+ if msg != read_msg:
+ log.error("Mismatch! Read: {}, Expected: {}".format(
+ read_msg, msg))
+ return False
+ return True
diff --git a/acts/tests/google/ble/api/BleAdvertiseApiTest.py b/acts/tests/google/ble/api/BleAdvertiseApiTest.py
index 2219ccb..90764f8 100644
--- a/acts/tests/google/ble/api/BleAdvertiseApiTest.py
+++ b/acts/tests/google/ble/api/BleAdvertiseApiTest.py
@@ -35,41 +35,10 @@
class BleAdvertiseApiTest(BluetoothBaseTest):
- tests = None
-
def __init__(self, controllers):
BluetoothBaseTest.__init__(self, controllers)
self.droid_list = get_advanced_droid_list(self.android_devices)
self.droid = self.android_devices[0].droid
- self.tests = (
- "test_adv_settings_defaults",
- "test_adv_data_defaults",
- "test_adv_settings_set_adv_mode_balanced",
- "test_adv_settings_set_adv_mode_low_power",
- "test_adv_settings_set_adv_mode_low_latency",
- "test_adv_settings_set_invalid_adv_mode",
- "test_adv_settings_set_adv_tx_power_level_high",
- "test_adv_settings_set_adv_tx_power_level_medium",
- "test_adv_settings_set_adv_tx_power_level_low",
- "test_adv_settings_set_adv_tx_power_level_ultra_low",
- "test_adv_settings_set_invalid_adv_tx_power_level",
- "test_adv_settings_set_is_connectable_true",
- "test_adv_settings_set_is_connectable_false",
- "test_adv_data_set_service_uuids_empty",
- "test_adv_data_set_service_uuids_single",
- "test_adv_data_set_service_uuids_multiple",
- "test_adv_data_set_service_uuids_invalid_uuid",
- "test_adv_data_set_service_data",
- "test_adv_data_set_service_data_invalid_service_data",
- "test_adv_data_set_service_data_invalid_service_data_uuid",
- "test_adv_data_set_manu_id",
- "test_adv_data_set_manu_id_invalid_manu_id",
- "test_adv_data_set_manu_id_invalid_manu_specific_data",
- "test_adv_data_set_manu_id_max",
- "test_adv_data_set_include_tx_power_level_true",
- "test_adv_data_set_include_tx_power_level_false",
- "test_adv_data_set_include_device_name_true",
- "test_adv_data_set_include_device_name_false", )
if self.droid_list[0]['max_advertisements'] > 0:
self.tests = self.tests + (
diff --git a/acts/tests/google/ble/gatt/GattConnectTest.py b/acts/tests/google/ble/gatt/GattConnectTest.py
index a4c6b52..00363f8 100644
--- a/acts/tests/google/ble/gatt/GattConnectTest.py
+++ b/acts/tests/google/ble/gatt/GattConnectTest.py
@@ -222,6 +222,56 @@
gatt_callback)
@BluetoothBaseTest.bt_test_wrap
+ def test_gatt_connect_autoconnect(self):
+ """Test GATT connection over LE.
+
+ Test re-establishing a gat connection using autoconnect
+ set to True in order to test connection whitelist.
+
+ Steps:
+ 1. Start a generic advertisement.
+ 2. Start a generic scanner.
+ 3. Find the advertisement and extract the mac address.
+ 4. Stop the first scanner.
+ 5. Create a GATT connection between the scanner and advertiser.
+ 6. Disconnect the GATT connection.
+ 7. Create a GATT connection with autoconnect set to True
+ 8. Disconnect the GATT connection.
+
+ Expected Result:
+ Verify that a connection was re-established and then disconnected
+ successfully.
+
+ Returns:
+ Pass if True
+ Fail if False
+
+ TAGS: LE, Advertising, Filtering, Scanning, GATT
+ Priority: 0
+ """
+ autoconnect = False
+ mac_address, adv_callback = (
+ get_mac_address_of_generic_advertisement(self.cen_ad, self.per_ad))
+ test_result, bluetooth_gatt, gatt_callback = setup_gatt_connection(
+ self.cen_ad, mac_address, autoconnect)
+ if not disconnect_gatt_connection(self.cen_ad, bluetooth_gatt,
+ gatt_callback):
+ return False
+ autoconnect = True
+ bluetooth_gatt = self.cen_ad.droid.gattClientConnectGatt(
+ gatt_callback, mac_address, autoconnect)
+ expected_event = GattCbStrings.GATT_CONN_CHANGE.value.format(
+ gatt_callback)
+ try:
+ event = self.cen_ad.ed.pop_event(expected_event,
+ self.default_timeout)
+ except Empty:
+ log.error(GattCbErr.GATT_CONN_CHANGE_ERR.value.format(
+ expected_event))
+ test_result = False
+ return True
+
+ @BluetoothBaseTest.bt_test_wrap
def test_gatt_request_min_mtu(self):
"""Test GATT connection over LE and exercise MTU sizes.
diff --git a/acts/tests/google/bt/RfcommTest.py b/acts/tests/google/bt/RfcommTest.py
index 4d939fd..502bc68 100644
--- a/acts/tests/google/bt/RfcommTest.py
+++ b/acts/tests/google/bt/RfcommTest.py
@@ -24,13 +24,14 @@
from queue import Empty
from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
-from acts.test_utils.bt.bt_test_utils import get_bt_mac_address
from acts.test_utils.bt.bt_test_utils import log_energy_info
+from acts.test_utils.bt.bt_test_utils import kill_bluetooth_process
from acts.test_utils.bt.bt_test_utils import reset_bluetooth
from acts.test_utils.bt.bt_test_utils import rfcomm_accept
from acts.test_utils.bt.bt_test_utils import rfcomm_connect
from acts.test_utils.bt.bt_test_utils import setup_multiple_devices_for_bt_test
from acts.test_utils.bt.bt_test_utils import take_btsnoop_logs
+from acts.test_utils.bt.bt_test_utils import write_read_verity_data
class RfcommTest(BluetoothBaseTest):
@@ -75,22 +76,35 @@
def teardown_test(self):
with suppress(Exception):
- for thread in self.thread_list:
- thread.join()
self.client_ad.droid.bluetoothRfcommStop()
self.server_ad.droid.bluetoothRfcommStop()
+ self.client_ad.droid.bluetoothRfcommCloseSocket()
+ self.server_ad.droid.bluetoothRfcommCloseSocket()
+ for thread in self.thread_list:
+ thread.join()
+ self.thread_list.clear()
def orchestrate_rfcomm_connect(self, server_mac):
accept_thread = threading.Thread(target=rfcomm_accept,
- args=(self.server_ad.droid, ))
+ args=(self.server_ad, ))
self.thread_list.append(accept_thread)
accept_thread.start()
- connect_thread = threading.Thread(
- target=rfcomm_connect,
- args=(self.client_ad.droid, server_mac))
+ connect_thread = threading.Thread(target=rfcomm_connect,
+ args=(self.client_ad, server_mac))
self.rf_client_th = connect_thread
self.thread_list.append(connect_thread)
connect_thread.start()
+ for thread in self.thread_list:
+ thread.join()
+ end_time = time.time() + self.default_timeout
+ result = False
+ while time.time() < end_time:
+ if len(self.client_ad.droid.bluetoothRfcommActiveConnections(
+ )) > 0:
+ self.log.info("RFCOMM Connection Active")
+ return True
+ self.log.error("Failed to establish an RFCOMM connection")
+ return False
@BluetoothBaseTest.bt_test_wrap
def test_rfcomm_connection_write_ascii(self):
@@ -118,13 +132,11 @@
Priority: 1
"""
server_mac = self.server_ad.droid.bluetoothGetLocalAddress()
- self.orchestrate_rfcomm_connect(server_mac)
- self.log.info("Write message.")
- self.client_ad.droid.bluetoothRfcommWrite(self.message)
- self.log.info("Read message.")
- read_msg = self.server_ad.droid.bluetoothRfcommRead()
- self.log.info("Verify message.")
- assert self.message == read_msg, "Mismatch! Read {}".format(read_msg)
+ if not self.orchestrate_rfcomm_connect(server_mac):
+ return False
+ if not write_read_verify_data(self.client_ad, self.server_ad,
+ self.message, False):
+ return False
if len(self.server_ad.droid.bluetoothRfcommActiveConnections()) == 0:
self.log.info("No rfcomm connections found on server.")
return False
@@ -162,14 +174,12 @@
Priority: 1
"""
server_mac = self.server_ad.droid.bluetoothGetLocalAddress()
- self.orchestrate_rfcomm_connect(server_mac)
+ if not self.orchestrate_rfcomm_connect(server_mac):
+ return False
binary_message = "11010101"
- self.log.info("Write message.")
- self.client_ad.droid.bluetoothRfcommWriteBinary(binary_message)
- self.log.info("Read message.")
- read_msg = self.server_ad.droid.bluetoothRfcommReadBinary().rstrip("\r\n")
- self.log.info("Verify message.")
- assert binary_message == read_msg, "Mismatch! Read {}".format(read_msg)
+ if not write_read_verify_data(self.client_ad, self.server_ad,
+ binary_message, True):
+ return False
if len(self.server_ad.droid.bluetoothRfcommActiveConnections()) == 0:
self.log.info("No rfcomm connections found on server.")
return False
diff --git a/acts/tests/google/bt/system_tests/BtStressTest.py b/acts/tests/google/bt/system_tests/BtStressTest.py
index 2e1c74f..193fa52 100644
--- a/acts/tests/google/bt/system_tests/BtStressTest.py
+++ b/acts/tests/google/bt/system_tests/BtStressTest.py
@@ -69,14 +69,16 @@
TAGS: Classic, Stress
Priority: 1
"""
- n = 0
test_result = True
test_result_list = []
- while n < 100:
- self.log.info("Toggling bluetooth iteration {}.".format(n+1))
+ for n in range(100):
+ self.log.info("Toggling bluetooth iteration {}.".format(n + 1))
test_result = reset_bluetooth([self.android_devices[0]])
test_result_list.append(test_result)
- n += 1
+ if not test_result:
+ self.log.debug("Failure to reset Bluetooth... continuing")
+ self.log.info("Toggling Bluetooth failed {}/100 times".format(len(
+ test_result_list)))
if False in test_result_list:
return False
return test_result
@@ -105,10 +107,9 @@
Priority: 1
"""
for n in range(100):
- self.log.info("Pair bluetooth iteration {}.".format(n+1))
- if (pair_pri_to_sec(
- self.android_devices[0].droid,
- self.android_devices[1].droid) == False):
+ self.log.info("Pair bluetooth iteration {}.".format(n + 1))
+ if (pair_pri_to_sec(self.android_devices[0].droid,
+ self.android_devices[1].droid) == False):
self.log.error("Failed to bond devices.")
return False
for ad in self.android_devices:
@@ -119,7 +120,7 @@
time.sleep(1)
bonded_devices = ad.droid.bluetoothGetBondedDevices()
if len(bonded_devices) > 0:
- self.log.error("Failed to unbond devices: {}".format(bonded_devices))
+ self.log.error("Failed to unbond devices: {}".format(
+ bonded_devices))
return False
return True
-
diff --git a/acts/tests/google/bt/system_tests/RfcommLongevityTest.py b/acts/tests/google/bt/system_tests/RfcommLongevityTest.py
index bc02f19..5e72c54 100644
--- a/acts/tests/google/bt/system_tests/RfcommLongevityTest.py
+++ b/acts/tests/google/bt/system_tests/RfcommLongevityTest.py
@@ -27,11 +27,10 @@
from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
from acts.test_utils.bt.bt_test_utils import reset_bluetooth
from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
-from acts.test_utils.bt.bt_test_utils import get_bt_mac_address
from acts.test_utils.bt.bt_test_utils import rfcomm_accept
from acts.test_utils.bt.bt_test_utils import rfcomm_connect
from acts.test_utils.bt.bt_test_utils import take_btsnoop_logs
-
+from acts.test_utils.bt.bt_test_utils import write_read_verify_data
class RfcommLongevityTest(BluetoothBaseTest):
default_timeout = 10
@@ -61,12 +60,12 @@
def orchestrate_rfcomm_connect(self, server_mac):
accept_thread = threading.Thread(target=rfcomm_accept,
- args=(self.server_ad.droid, ))
+ args=(self.server_ad, ))
self.thread_list.append(accept_thread)
accept_thread.start()
connect_thread = threading.Thread(
target=rfcomm_connect,
- args=(self.client_ad.droid, server_mac))
+ args=(self.client_ad, server_mac))
self.thread_list.append(connect_thread)
connect_thread.start()
@@ -104,13 +103,9 @@
for n in range(write_iterations):
self.log.info("iteration {} data".format(((n + 1) + (
i * write_iterations))))
- self.log.info("Write message.")
- self.client_ad.droid.bluetoothRfcommWrite(self.generic_message)
- self.log.info("Read message.")
- read_msg = self.server_ad.droid.bluetoothRfcommRead()
- self.log.info("Verify message.")
- assert self.generic_message == read_msg, "Mismatch! Read {}".format(
- read_msg)
+ if not write_read_verify_data(self.client_ad, self.server_ad,
+ self.generic_message, False):
+ return False
self.log.info("Iteration {} completed".format(n))
self.client_ad.droid.bluetoothRfcommStop()
self.server_ad.droid.bluetoothRfcommStop()
@@ -155,13 +150,9 @@
for n in range(write_iterations):
self.log.info("iteration {} data".format(((n + 1) + (
i * write_iterations))))
- self.log.info("Write message.")
- self.client_ad.droid.bluetoothRfcommWrite(message)
- self.log.info("Read message.")
- read_msg = self.server_ad.droid.bluetoothRfcommRead()
- self.log.info("Verify message.")
- assert message == read_msg, "Mismatch! Read {}".format(
- read_msg)
+ if not write_read_verify_data(self.client_ad, self.server_ad,
+ message, False):
+ return False
self.log.info("Iteration {} completed".format(n))
self.client_ad.droid.bluetoothRfcommStop()
self.server_ad.droid.bluetoothRfcommStop()
@@ -198,7 +189,6 @@
Priority: 2
"""
server_mac = self.server_ad.droid.bluetoothGetLocalAddress()
- #message = "1001010101" #todo: investigate why this fails...
binary_message = "11010101"
write_iterations = 5000
for i in range(self.longev_iterations):
@@ -207,13 +197,9 @@
for n in range(write_iterations):
self.log.info("iteration {} data".format(((n + 1) + (
i * write_iterations))))
- self.log.info("Write message.")
- self.client_ad.droid.bluetoothRfcommWriteBinary(binary_message)
- self.log.info("Read message.")
- read_msg = self.server_ad.droid.bluetoothRfcommReadBinary().rstrip("\r\n")
- self.log.info("Verify message.")
- assert binary_message == read_msg, "Mismatch! Read {}".format(
- read_msg)
+ if not write_read_verify_data(self.client_ad, self.server_ad,
+ binary_message, True):
+ return False
self.log.info("Iteration {} completed".format(n))
self.client_ad.droid.bluetoothRfcommStop()
self.server_ad.droid.bluetoothRfcommStop()
@@ -258,13 +244,9 @@
for n in range(write_iterations):
self.log.info("iteration {} data".format(((n + 1) + (
i * write_iterations))))
- self.log.info("Write message.")
- self.client_ad.droid.bluetoothRfcommWrite(message)
- self.log.info("Read message.")
- read_msg = self.server_ad.droid.bluetoothRfcommRead()
- self.log.info("Verify message.")
- assert message == read_msg, "Mismatch! Read {}".format(
- read_msg)
+ if not write_read_verify_data(self.client_ad, self.server_ad,
+ message, False):
+ return False
self.log.info("Iteration {} completed".format(n))
self.client_ad.droid.bluetoothRfcommStop()
self.server_ad.droid.bluetoothRfcommStop()
@@ -313,14 +295,9 @@
for n in range(write_iterations):
self.log.info("iteration {} data".format(((n + 1) + (
i * write_iterations))))
- self.log.info("Write message.")
- self.client_ad.droid.bluetoothRfcommWrite(
- self.generic_message)
- self.log.info("Read message.")
- read_msg = self.server_ad.droid.bluetoothRfcommRead()
- self.log.info("Verify message.")
- assert self.generic_message == read_msg, "Mismatch! Read {}".format(
- read_msg)
+ if not write_read_verify_data(self.client_ad, self.server_ad,
+ self.generic_message, False):
+ return False
self.log.info("Iteration {} completed".format(n))
if n > random_interup_iteration:
self.client_ad.droid.bluetoothRfcommCloseSocket()
@@ -390,13 +367,9 @@
for n in range(write_iterations):
self.log.info("iteration {} data".format(((n + 1) + (
i * write_iterations))))
- self.log.info("Write message.")
- self.client_ad.droid.bluetoothRfcommWrite(message)
- self.log.info("Read message.")
- read_msg = self.server_ad.droid.bluetoothRfcommRead()
- self.log.info("Verify message.")
- assert message == read_msg, "Mismatch! Read {}".format(
- read_msg)
+ if not write_read_verify_data(self.client_ad, self.server_ad,
+ message, False):
+ return False
self.log.info("Iteration {} completed".format(n))
size_of_message = len(message)
#max size is 990 due to a bug in sl4a.
diff --git a/acts/tests/google/bt/system_tests/RfcommStressTest.py b/acts/tests/google/bt/system_tests/RfcommStressTest.py
index 0f0b20e..75d5a14 100644
--- a/acts/tests/google/bt/system_tests/RfcommStressTest.py
+++ b/acts/tests/google/bt/system_tests/RfcommStressTest.py
@@ -63,7 +63,7 @@
accept_thread.start()
connect_thread = threading.Thread(
target=rfcomm_connect,
- args=(self.client_ad.droid, server_mac))
+ args=(self.client_ad, server_mac))
self.thread_list.append(connect_thread)
connect_thread.start()