WENTAO WANG | b4da3a5 | 2017-11-02 14:37:56 -0700 | [diff] [blame] | 1 | """Bluetooth tests.""" |
| 2 | |
| 3 | |
| 4 | |
| 5 | from mobly import asserts |
Yuexi Ma | b4da9e0 | 2017-12-22 02:51:25 -0800 | [diff] [blame^] | 6 | from mobly import test_runner |
WENTAO WANG | b4da3a5 | 2017-11-02 14:37:56 -0700 | [diff] [blame] | 7 | from mobly import utils |
| 8 | from utils import android_base_test |
| 9 | |
| 10 | # Number of seconds for the target to stay discoverable on Bluetooth. |
| 11 | DISCOVERABLE_TIME = 180 |
| 12 | # Random DATA to represent device bluetooth name. |
| 13 | DEVICE_NAME = utils.rand_ascii_str(8) |
| 14 | # UUID value for RfComm connection |
| 15 | RFCOMM_UUID = 'fa87c0d0-afac-11de-8a39-0800200c9a66' |
| 16 | |
| 17 | |
| 18 | def DiscoverBluetoothDeviceByName(device, name): |
| 19 | """Start a bluetooth scan and find the device that has the bluetooth name. |
| 20 | |
| 21 | Args: |
| 22 | device: AndroidDevice. Device to start bluetooth scan. |
| 23 | name: the bluetooth name looking for. |
| 24 | |
| 25 | Returns: |
| 26 | Dict represents a bluetooth device if found. |
| 27 | """ |
| 28 | discovered_devices = device.android.btDiscoverAndGetResults() |
| 29 | for device in discovered_devices: |
| 30 | if name == device['Name']: |
| 31 | device_discovered = device |
| 32 | return device_discovered |
| 33 | asserts.fail('Failed to discover the target device %s over Bluetooth.' % name) |
| 34 | |
| 35 | |
| 36 | class BluetoothTest(android_base_test.AndroidBaseTest): |
| 37 | """Bluetooth tests.""" |
| 38 | |
| 39 | def setup_class(self): |
| 40 | super(BluetoothTest, self).setup_class() |
| 41 | self.initiator = self.dut_a |
| 42 | # Sets the tag that represents this device in logs. |
| 43 | self.initiator.debug_tag = 'initiator' |
| 44 | # The device that is expected to be discovered and receive messages. |
| 45 | self.receiver = self.dut_b |
| 46 | self.receiver.debug_tag = 'receiver' |
| 47 | |
| 48 | def setup_test(self): |
| 49 | # Make sure bluetooth is on. |
| 50 | self.initiator.android.btEnable() |
| 51 | self.receiver.android.btEnable() |
| 52 | # Set Bluetooth name on target device. |
| 53 | self.receiver.android.btSetName(DEVICE_NAME) |
| 54 | |
| 55 | def test_bluetooth_process(self): |
| 56 | """Test for basic bluetooth rfcomm process flow. |
| 57 | |
| 58 | Steps: |
| 59 | 1. Receiver becomes discoverable. |
| 60 | 2. Initiator discovers receiver via bluetooth. |
| 61 | 3. Initiator connects to receiver via rfcomm profile. |
| 62 | 4. Initiator sends a message to receiver and receiver receives the exact |
| 63 | message. |
| 64 | |
| 65 | Verifies: |
| 66 | Receiver receives the correct message. |
| 67 | """ |
| 68 | # Name value for RfComm connection |
| 69 | rfcomm_name = utils.rand_ascii_str(8) |
| 70 | self.receiver.connection_callback = ( |
| 71 | self.receiver.android.btRfcommStartServer(rfcomm_name, RFCOMM_UUID)) |
| 72 | self.receiver.log.info('Start Rfcomm server with name: %s uuid: %s' % |
| 73 | (rfcomm_name, RFCOMM_UUID)) |
| 74 | target_name = self.receiver.android.btGetName() |
| 75 | self.receiver.log.info('Become discoverable with name "%s" for %ds.', |
| 76 | target_name, DISCOVERABLE_TIME) |
| 77 | self.receiver.android.btBecomeDiscoverable(DISCOVERABLE_TIME) |
| 78 | self.initiator.log.info('Looking for Bluetooth devices.') |
| 79 | discovered_device = DiscoverBluetoothDeviceByName( |
| 80 | self.initiator, target_name) |
| 81 | self.initiator.log.info('Target device is found. Device: %s' |
| 82 | % discovered_device) |
| 83 | remote_address = discovered_device['Address'] |
| 84 | self.initiator.android.btRfcommConnect(remote_address, RFCOMM_UUID) |
| 85 | self.receiver.connection_callback.waitAndGet('onAccepted', 30) |
| 86 | # self.initiator.connection_callback.waitAndGet('onConnected', 30) |
| 87 | self.initiator.log.info('Connection established') |
| 88 | # Random data to be sent through bluetooth rfcomm. |
| 89 | data = utils.rand_ascii_str(8) |
| 90 | self.receiver.read_callback = self.receiver.android.btRfcommRead() |
| 91 | self.initiator.android.btRfcommWrite(data) |
| 92 | read_result = self.receiver.read_callback.waitAndGet('onDataAvailable', 30) |
| 93 | asserts.assert_equal(read_result.data['Data'], data) |
| 94 | self.receiver.log.info('Received correct message from the other side') |
| 95 | self.initiator.android.btRfcommDisconnect() |
| 96 | self.receiver.android.btRfcommStopServer() |
| 97 | |
| 98 | def teardown_test(self): |
| 99 | # Turn Bluetooth off on both devices after test finishes. |
| 100 | self.receiver.android.btDisable() |
| 101 | self.initiator.android.btDisable() |
| 102 | |
| 103 | |
| 104 | if __name__ == '__main__': |
Yuexi Ma | b4da9e0 | 2017-12-22 02:51:25 -0800 | [diff] [blame^] | 105 | test_runner.main() |