blob: 5df6e0868b135e4edc399a41851f0d1531e213d4 [file] [log] [blame]
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import os
import struct
import pytest
from bumble.controller import Controller
from bumble.link import LocalLink
from bumble.device import Device, Peer
from bumble.host import Host
from bumble.gatt import (
GATT_BATTERY_LEVEL_CHARACTERISTIC,
CharacteristicAdapter,
DelegatedCharacteristicAdapter,
PackedCharacteristicAdapter,
MappedCharacteristicAdapter,
UTF8CharacteristicAdapter,
Service,
Characteristic,
CharacteristicValue
)
from bumble.transport import AsyncPipeSink
from bumble.core import UUID
from bumble.att import (
ATT_EXCHANGE_MTU_REQUEST,
ATT_ATTRIBUTE_NOT_FOUND_ERROR,
ATT_PDU,
ATT_Error_Response,
ATT_Read_By_Group_Type_Request
)
# -----------------------------------------------------------------------------
def basic_check(x):
pdu = x.to_bytes()
parsed = ATT_PDU.from_bytes(pdu)
x_str = str(x)
parsed_str = str(parsed)
assert(x_str == parsed_str)
# -----------------------------------------------------------------------------
def test_UUID():
u = UUID.from_16_bits(0x7788)
assert(str(u) == 'UUID-16:7788')
u = UUID.from_32_bits(0x11223344)
assert(str(u) == 'UUID-32:11223344')
u = UUID('61A3512C-09BE-4DDC-A6A6-0B03667AAFC6')
assert(str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6')
v = UUID(str(u))
assert(str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6')
w = UUID.from_bytes(v.to_bytes())
assert(str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6')
u1 = UUID.from_16_bits(0x1234)
b1 = u1.to_bytes(force_128 = True)
u2 = UUID.from_bytes(b1)
assert(u1 == u2)
u3 = UUID.from_16_bits(0x180a)
assert(str(u3) == 'UUID-16:180A (Device Information)')
# -----------------------------------------------------------------------------
def test_ATT_Error_Response():
pdu = ATT_Error_Response(
request_opcode_in_error = ATT_EXCHANGE_MTU_REQUEST,
attribute_handle_in_error = 0x0000,
error_code = ATT_ATTRIBUTE_NOT_FOUND_ERROR
)
basic_check(pdu)
# -----------------------------------------------------------------------------
def test_ATT_Read_By_Group_Type_Request():
pdu = ATT_Read_By_Group_Type_Request(
starting_handle = 0x0001,
ending_handle = 0xFFFF,
attribute_group_type = UUID.from_16_bits(0x2800)
)
basic_check(pdu)
# -----------------------------------------------------------------------------
def test_CharacteristicAdapter():
# Check that the CharacteristicAdapter base class is transparent
v = bytes([1, 2, 3])
c = Characteristic(GATT_BATTERY_LEVEL_CHARACTERISTIC, Characteristic.READ, Characteristic.READABLE, v)
a = CharacteristicAdapter(c)
value = a.read_value(None)
assert(value == v)
v = bytes([3, 4, 5])
a.write_value(None, v)
assert(c.value == v)
# Simple delegated adapter
a = DelegatedCharacteristicAdapter(c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x)))
value = a.read_value(None)
assert(value == bytes(reversed(v)))
v = bytes([3, 4, 5])
a.write_value(None, v)
assert(a.value == bytes(reversed(v)))
# Packed adapter with single element format
v = 1234
pv = struct.pack('>H', v)
c.value = v
a = PackedCharacteristicAdapter(c, '>H')
value = a.read_value(None)
assert(value == pv)
c.value = None
a.write_value(None, pv)
assert(a.value == v)
# Packed adapter with multi-element format
v1 = 1234
v2 = 5678
pv = struct.pack('>HH', v1, v2)
c.value = (v1, v2)
a = PackedCharacteristicAdapter(c, '>HH')
value = a.read_value(None)
assert(value == pv)
c.value = None
a.write_value(None, pv)
assert(a.value == (v1, v2))
# Mapped adapter
v1 = 1234
v2 = 5678
pv = struct.pack('>HH', v1, v2)
mapped = {'v1': v1, 'v2': v2}
c.value = mapped
a = MappedCharacteristicAdapter(c, '>HH', ('v1', 'v2'))
value = a.read_value(None)
assert(value == pv)
c.value = None
a.write_value(None, pv)
assert(a.value == mapped)
# UTF-8 adapter
v = 'Hello π'
ev = v.encode('utf-8')
c.value = v
a = UTF8CharacteristicAdapter(c)
value = a.read_value(None)
assert(value == ev)
c.value = None
a.write_value(None, ev)
assert(a.value == v)
# -----------------------------------------------------------------------------
def test_CharacteristicValue():
b = bytes([1, 2, 3])
c = CharacteristicValue(read=lambda _: b)
x = c.read(None)
assert(x == b)
result = []
c = CharacteristicValue(write=lambda connection, value: result.append((connection, value)))
z = object()
c.write(z, b)
assert(result == [(z, b)])
# -----------------------------------------------------------------------------
class TwoDevices:
def __init__(self):
self.connections = [None, None]
self.link = LocalLink()
self.controllers = [
Controller('C1', link = self.link),
Controller('C2', link = self.link)
]
self.devices = [
Device(
address = 'F0:F1:F2:F3:F4:F5',
host = Host(self.controllers[0], AsyncPipeSink(self.controllers[0]))
),
Device(
address = 'F5:F4:F3:F2:F1:F0',
host = Host(self.controllers[1], AsyncPipeSink(self.controllers[1]))
)
]
self.paired = [None, None]
# -----------------------------------------------------------------------------
async def async_barrier():
ready = asyncio.get_running_loop().create_future()
asyncio.get_running_loop().call_soon(ready.set_result, None)
await ready
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_read_write():
[client, server] = TwoDevices().devices
characteristic1 = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE
)
def on_characteristic1_write(connection, value):
characteristic1._last_value = (connection, value)
characteristic1.on('write', on_characteristic1_write)
def on_characteristic2_read(connection):
return bytes(str(connection.peer_address))
def on_characteristic2_write(connection, value):
characteristic2._last_value = (connection, value)
characteristic2 = Characteristic(
'66DE9057-C848-4ACA-B993-D675644EBB85',
Characteristic.READ | Characteristic.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
CharacteristicValue(read=on_characteristic2_read, write=on_characteristic2_write)
)
service1 = Service(
'3A657F47-D34F-46B3-B1EC-698E29B6B829',
[
characteristic1,
characteristic2
]
)
server.add_services([service1])
await client.power_on()
await server.power_on()
connection = await client.connect(server.random_address)
peer = Peer(connection)
await peer.discover_services()
await peer.discover_characteristics()
c = peer.get_characteristics_by_uuid(characteristic1.uuid)
assert(len(c) == 1)
c1 = c[0]
c = peer.get_characteristics_by_uuid(characteristic2.uuid)
assert(len(c) == 1)
c2 = c[0]
v1 = await peer.read_value(c1)
assert(v1 == b'')
b = bytes([1, 2, 3])
await peer.write_value(c1, b)
await async_barrier()
assert(characteristic1.value == b)
v1 = await peer.read_value(c1)
assert(v1 == b)
assert(type(characteristic1._last_value) is tuple)
assert(len(characteristic1._last_value) == 2)
assert(str(characteristic1._last_value[0].peer_address) == str(client.random_address))
assert(characteristic1._last_value[1] == b)
bb = bytes([3, 4, 5, 6])
characteristic1.value = bb
v1 = await peer.read_value(c1)
assert(v1 == bb)
await peer.write_value(c2, b)
await async_barrier()
assert(type(characteristic2._last_value) is tuple)
assert(len(characteristic2._last_value) == 2)
assert(str(characteristic2._last_value[0].peer_address) == str(client.random_address))
assert(characteristic2._last_value[1] == b)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_read_write2():
[client, server] = TwoDevices().devices
v = bytes([0x11, 0x22, 0x33, 0x44])
characteristic1 = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
value=v
)
service1 = Service(
'3A657F47-D34F-46B3-B1EC-698E29B6B829',
[
characteristic1
]
)
server.add_services([service1])
await client.power_on()
await server.power_on()
connection = await client.connect(server.random_address)
peer = Peer(connection)
await peer.discover_services()
c = peer.get_services_by_uuid(service1.uuid)
assert(len(c) == 1)
s = c[0]
await s.discover_characteristics()
c = s.get_characteristics_by_uuid(characteristic1.uuid)
assert(len(c) == 1)
c1 = c[0]
v1 = await c1.read_value()
assert(v1 == v)
a1 = PackedCharacteristicAdapter(c1, '>I')
v1 = await a1.read_value()
assert(v1 == struct.unpack('>I', v)[0])
b = bytes([0x55, 0x66, 0x77, 0x88])
await a1.write_value(struct.unpack('>I', b)[0])
await async_barrier()
assert(characteristic1.value == b)
v1 = await a1.read_value()
assert(v1 == struct.unpack('>I', b)[0])
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_subscribe_notify():
[client, server] = TwoDevices().devices
characteristic1 = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.READABLE,
bytes([1, 2, 3])
)
def on_characteristic1_subscription(connection, notify_enabled, indicate_enabled):
characteristic1._last_subscription = (connection, notify_enabled, indicate_enabled)
characteristic1.on('subscription', on_characteristic1_subscription)
characteristic2 = Characteristic(
'66DE9057-C848-4ACA-B993-D675644EBB85',
Characteristic.READ | Characteristic.INDICATE,
Characteristic.READABLE,
bytes([4, 5, 6])
)
def on_characteristic2_subscription(connection, notify_enabled, indicate_enabled):
characteristic2._last_subscription = (connection, notify_enabled, indicate_enabled)
characteristic2.on('subscription', on_characteristic2_subscription)
characteristic3 = Characteristic(
'AB5E639C-40C1-4238-B9CB-AF41F8B806E4',
Characteristic.READ | Characteristic.NOTIFY | Characteristic.INDICATE,
Characteristic.READABLE,
bytes([7, 8, 9])
)
def on_characteristic3_subscription(connection, notify_enabled, indicate_enabled):
characteristic3._last_subscription = (connection, notify_enabled, indicate_enabled)
characteristic3.on('subscription', on_characteristic3_subscription)
service1 = Service(
'3A657F47-D34F-46B3-B1EC-698E29B6B829',
[
characteristic1,
characteristic2,
characteristic3
]
)
server.add_services([service1])
def on_characteristic_subscription(connection, characteristic, notify_enabled, indicate_enabled):
server._last_subscription = (connection, characteristic, notify_enabled, indicate_enabled)
server.on('characteristic_subscription', on_characteristic_subscription)
await client.power_on()
await server.power_on()
connection = await client.connect(server.random_address)
peer = Peer(connection)
await peer.discover_services()
await peer.discover_characteristics()
c = peer.get_characteristics_by_uuid(characteristic1.uuid)
assert(len(c) == 1)
c1 = c[0]
c = peer.get_characteristics_by_uuid(characteristic2.uuid)
assert(len(c) == 1)
c2 = c[0]
c = peer.get_characteristics_by_uuid(characteristic3.uuid)
assert(len(c) == 1)
c3 = c[0]
c1._last_update = None
def on_c1_update(connection, value):
c1._last_update = (connection, value)
c1.on('update', on_c1_update)
await peer.subscribe(c1)
await async_barrier()
assert(server._last_subscription[1] == characteristic1)
assert(server._last_subscription[2])
assert(not server._last_subscription[3])
assert(characteristic1._last_subscription[1])
assert(not characteristic1._last_subscription[2])
await server.indicate_subscribers(characteristic1)
await async_barrier()
assert(c1._last_update is None)
await server.notify_subscribers(characteristic1)
await async_barrier()
assert(c1._last_update is not None)
assert(c1._last_update[1] == characteristic1.value)
c2._last_update = None
def on_c2_update(value):
c2._last_update = (connection, value)
await peer.subscribe(c2, on_c2_update)
await async_barrier()
await server.notify_subscriber(characteristic2._last_subscription[0], characteristic2)
await async_barrier()
assert(c2._last_update is None)
await server.indicate_subscriber(characteristic2._last_subscription[0], characteristic2)
await async_barrier()
assert(c2._last_update is not None)
assert(c2._last_update[1] == characteristic2.value)
c3._last_update = None
def on_c3_update(connection, value):
c3._last_update = (connection, value)
c3.on('update', on_c3_update)
await peer.subscribe(c3)
await async_barrier()
await server.notify_subscriber(characteristic3._last_subscription[0], characteristic3)
await async_barrier()
assert(c3._last_update is not None)
assert(c3._last_update[1] == characteristic3.value)
characteristic3.value = bytes([1, 2, 3])
await server.indicate_subscriber(characteristic3._last_subscription[0], characteristic3)
await async_barrier()
assert(c3._last_update is not None)
assert(c3._last_update[1] == characteristic3.value)
# -----------------------------------------------------------------------------
async def async_main():
await test_read_write()
await test_read_write2()
await test_subscribe_notify()
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
test_UUID()
test_ATT_Error_Response()
test_ATT_Read_By_Group_Type_Request()
test_CharacteristicValue()
test_CharacteristicAdapter()
asyncio.run(async_main())