blob: 22b683e4eb6a46629eeff388507f4ee7f8d348ef [file] [log] [blame]
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# ATT - Attribute Protocol
#
# See Bluetooth spec @ Vol 3, Part F
#
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from colors import color
from pyee import EventEmitter
from .core import *
from .hci import *
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
ATT_CID = 0x04
ATT_ERROR_RESPONSE = 0x01
ATT_EXCHANGE_MTU_REQUEST = 0x02
ATT_EXCHANGE_MTU_RESPONSE = 0x03
ATT_FIND_INFORMATION_REQUEST = 0x04
ATT_FIND_INFORMATION_RESPONSE = 0x05
ATT_FIND_BY_TYPE_VALUE_REQUEST = 0x06
ATT_FIND_BY_TYPE_VALUE_RESPONSE = 0x07
ATT_READ_BY_TYPE_REQUEST = 0x08
ATT_READ_BY_TYPE_RESPONSE = 0x09
ATT_READ_REQUEST = 0x0A
ATT_READ_RESPONSE = 0x0B
ATT_READ_BLOB_REQUEST = 0x0C
ATT_READ_BLOB_RESPONSE = 0x0D
ATT_READ_MULTIPLE_REQUEST = 0x0E
ATT_READ_MULTIPLE_RESPONSE = 0x0F
ATT_READ_BY_GROUP_TYPE_REQUEST = 0x10
ATT_READ_BY_GROUP_TYPE_RESPONSE = 0x11
ATT_WRITE_REQUEST = 0x12
ATT_WRITE_RESPONSE = 0x13
ATT_WRITE_COMMAND = 0x52
ATT_SIGNED_WRITE_COMMAND = 0xD2
ATT_PREPARE_WRITE_REQUEST = 0x16
ATT_PREPARE_WRITE_RESPONSE = 0x17
ATT_EXECUTE_WRITE_REQUEST = 0x18
ATT_EXECUTE_WRITE_RESPONSE = 0x19
ATT_HANDLE_VALUE_NOTIFICATION = 0x1B
ATT_HANDLE_VALUE_INDICATION = 0x1D
ATT_HANDLE_VALUE_CONFIRMATION = 0x1E
ATT_PDU_NAMES = {
ATT_ERROR_RESPONSE: 'ATT_ERROR_RESPONSE',
ATT_EXCHANGE_MTU_REQUEST: 'ATT_EXCHANGE_MTU_REQUEST',
ATT_EXCHANGE_MTU_RESPONSE: 'ATT_EXCHANGE_MTU_RESPONSE',
ATT_FIND_INFORMATION_REQUEST: 'ATT_FIND_INFORMATION_REQUEST',
ATT_FIND_INFORMATION_RESPONSE: 'ATT_FIND_INFORMATION_RESPONSE',
ATT_FIND_BY_TYPE_VALUE_REQUEST: 'ATT_FIND_BY_TYPE_VALUE_REQUEST',
ATT_FIND_BY_TYPE_VALUE_RESPONSE: 'ATT_FIND_BY_TYPE_VALUE_RESPONSE',
ATT_READ_BY_TYPE_REQUEST: 'ATT_READ_BY_TYPE_REQUEST',
ATT_READ_BY_TYPE_RESPONSE: 'ATT_READ_BY_TYPE_RESPONSE',
ATT_READ_REQUEST: 'ATT_READ_REQUEST',
ATT_READ_RESPONSE: 'ATT_READ_RESPONSE',
ATT_READ_BLOB_REQUEST: 'ATT_READ_BLOB_REQUEST',
ATT_READ_BLOB_RESPONSE: 'ATT_READ_BLOB_RESPONSE',
ATT_READ_MULTIPLE_REQUEST: 'ATT_READ_MULTIPLE_REQUEST',
ATT_READ_MULTIPLE_RESPONSE: 'ATT_READ_MULTIPLE_RESPONSE',
ATT_READ_BY_GROUP_TYPE_REQUEST: 'ATT_READ_BY_GROUP_TYPE_REQUEST',
ATT_READ_BY_GROUP_TYPE_RESPONSE: 'ATT_READ_BY_GROUP_TYPE_RESPONSE',
ATT_WRITE_REQUEST: 'ATT_WRITE_REQUEST',
ATT_WRITE_RESPONSE: 'ATT_WRITE_RESPONSE',
ATT_WRITE_COMMAND: 'ATT_WRITE_COMMAND',
ATT_SIGNED_WRITE_COMMAND: 'ATT_SIGNED_WRITE_COMMAND',
ATT_PREPARE_WRITE_REQUEST: 'ATT_PREPARE_WRITE_REQUEST',
ATT_PREPARE_WRITE_RESPONSE: 'ATT_PREPARE_WRITE_RESPONSE',
ATT_EXECUTE_WRITE_REQUEST: 'ATT_EXECUTE_WRITE_REQUEST',
ATT_EXECUTE_WRITE_RESPONSE: 'ATT_EXECUTE_WRITE_RESPONSE',
ATT_HANDLE_VALUE_NOTIFICATION: 'ATT_HANDLE_VALUE_NOTIFICATION',
ATT_HANDLE_VALUE_INDICATION: 'ATT_HANDLE_VALUE_INDICATION',
ATT_HANDLE_VALUE_CONFIRMATION: 'ATT_HANDLE_VALUE_CONFIRMATION'
}
ATT_REQUESTS = [
ATT_EXCHANGE_MTU_REQUEST,
ATT_FIND_INFORMATION_REQUEST,
ATT_FIND_BY_TYPE_VALUE_REQUEST,
ATT_READ_BY_TYPE_REQUEST,
ATT_READ_REQUEST,
ATT_READ_BLOB_REQUEST,
ATT_READ_MULTIPLE_REQUEST,
ATT_READ_BY_GROUP_TYPE_REQUEST,
ATT_WRITE_REQUEST,
ATT_PREPARE_WRITE_REQUEST,
ATT_EXECUTE_WRITE_REQUEST
]
ATT_RESPONSES = [
ATT_ERROR_RESPONSE,
ATT_EXCHANGE_MTU_RESPONSE,
ATT_FIND_INFORMATION_RESPONSE,
ATT_FIND_BY_TYPE_VALUE_RESPONSE,
ATT_READ_BY_TYPE_RESPONSE,
ATT_READ_RESPONSE,
ATT_READ_BLOB_RESPONSE,
ATT_READ_MULTIPLE_RESPONSE,
ATT_READ_BY_GROUP_TYPE_RESPONSE,
ATT_WRITE_RESPONSE,
ATT_PREPARE_WRITE_RESPONSE,
ATT_EXECUTE_WRITE_RESPONSE
]
ATT_INVALID_HANDLE_ERROR = 0x01
ATT_READ_NOT_PERMITTED_ERROR = 0x02
ATT_WRITE_NOT_PERMITTED_ERROR = 0x03
ATT_INVALID_PDU_ERROR = 0x04
ATT_INSUFFICIENT_AUTHENTICATION_ERROR = 0x05
ATT_REQUEST_NOT_SUPPORTED_ERROR = 0x06
ATT_INVALID_OFFSET_ERROR = 0x07
ATT_INSUFFICIENT_AUTHORIZATION_ERROR = 0x08
ATT_PREPARE_QUEUE_FULL_ERROR = 0x09
ATT_ATTRIBUTE_NOT_FOUND_ERROR = 0x0A
ATT_ATTRIBUTE_NOT_LONG_ERROR = 0x0B
ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR = 0x0C
ATT_INVALID_ATTRIBUTE_LENGTH_ERROR = 0x0D
ATT_UNLIKELY_ERROR_ERROR = 0x0E
ATT_INSUFFICIENT_ENCRYPTION_ERROR = 0x0F
ATT_UNSUPPORTED_GROUP_TYPE_ERROR = 0x10
ATT_INSUFFICIENT_RESOURCES_ERROR = 0x11
ATT_ERROR_NAMES = {
ATT_INVALID_HANDLE_ERROR: 'ATT_INVALID_HANDLE_ERROR',
ATT_READ_NOT_PERMITTED_ERROR: 'ATT_READ_NOT_PERMITTED_ERROR',
ATT_WRITE_NOT_PERMITTED_ERROR: 'ATT_WRITE_NOT_PERMITTED_ERROR',
ATT_INVALID_PDU_ERROR: 'ATT_INVALID_PDU_ERROR',
ATT_INSUFFICIENT_AUTHENTICATION_ERROR: 'ATT_INSUFFICIENT_AUTHENTICATION_ERROR',
ATT_REQUEST_NOT_SUPPORTED_ERROR: 'ATT_REQUEST_NOT_SUPPORTED_ERROR',
ATT_INVALID_OFFSET_ERROR: 'ATT_INVALID_OFFSET_ERROR',
ATT_INSUFFICIENT_AUTHORIZATION_ERROR: 'ATT_INSUFFICIENT_AUTHORIZATION_ERROR',
ATT_PREPARE_QUEUE_FULL_ERROR: 'ATT_PREPARE_QUEUE_FULL_ERROR',
ATT_ATTRIBUTE_NOT_FOUND_ERROR: 'ATT_ATTRIBUTE_NOT_FOUND_ERROR',
ATT_ATTRIBUTE_NOT_LONG_ERROR: 'ATT_ATTRIBUTE_NOT_LONG_ERROR',
ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR: 'ATT_INSUFFICIENT_ENCRYPTION_KEY_SIZE_ERROR',
ATT_INVALID_ATTRIBUTE_LENGTH_ERROR: 'ATT_INVALID_ATTRIBUTE_LENGTH_ERROR',
ATT_UNLIKELY_ERROR_ERROR: 'ATT_UNLIKELY_ERROR_ERROR',
ATT_INSUFFICIENT_ENCRYPTION_ERROR: 'ATT_INSUFFICIENT_ENCRYPTION_ERROR',
ATT_UNSUPPORTED_GROUP_TYPE_ERROR: 'ATT_UNSUPPORTED_GROUP_TYPE_ERROR',
ATT_INSUFFICIENT_RESOURCES_ERROR: 'ATT_INSUFFICIENT_RESOURCES_ERROR'
}
ATT_DEFAULT_MTU = 23
HANDLE_FIELD_SPEC = {'size': 2, 'mapper': lambda x: f'0x{x:04X}'}
UUID_2_16_FIELD_SPEC = lambda x, y: UUID.parse_uuid(x, y) # noqa: E731
UUID_2_FIELD_SPEC = lambda x, y: UUID.parse_uuid_2(x, y) # noqa: E731
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------
def key_with_value(dictionary, target_value):
for key, value in dictionary.items():
if value == target_value:
return key
return None
# -----------------------------------------------------------------------------
# Exceptions
# -----------------------------------------------------------------------------
class ATT_Error(Exception):
def __init__(self, error_code, att_handle=0x0000):
self.error_code = error_code
self.att_handle = att_handle
def __str__(self):
return f'ATT_Error({ATT_PDU.error_name(self.error_code)})'
# -----------------------------------------------------------------------------
# Attribute Protocol
# -----------------------------------------------------------------------------
class ATT_PDU:
'''
See Bluetooth spec @ Vol 3, Part F - 3.3 ATTRIBUTE PDU
'''
pdu_classes = {}
op_code = 0
@staticmethod
def from_bytes(pdu):
op_code = pdu[0]
cls = ATT_PDU.pdu_classes.get(op_code)
if cls is None:
instance = ATT_PDU(pdu)
instance.name = ATT_PDU.pdu_name(op_code)
instance.op_code = op_code
return instance
self = cls.__new__(cls)
ATT_PDU.__init__(self, pdu)
if hasattr(self, 'fields'):
self.init_from_bytes(pdu, 1)
return self
@staticmethod
def pdu_name(op_code):
return name_or_number(ATT_PDU_NAMES, op_code, 2)
@staticmethod
def error_name(error_code):
return name_or_number(ATT_ERROR_NAMES, error_code, 2)
@staticmethod
def subclass(fields):
def inner(cls):
cls.name = cls.__name__.upper()
cls.op_code = key_with_value(ATT_PDU_NAMES, cls.name)
if cls.op_code is None:
raise KeyError(f'PDU name {cls.name} not found in ATT_PDU_NAMES')
cls.fields = fields
# Register a factory for this class
ATT_PDU.pdu_classes[cls.op_code] = cls
return cls
return inner
def __init__(self, pdu=None, **kwargs):
if hasattr(self, 'fields') and kwargs:
HCI_Object.init_from_fields(self, self.fields, kwargs)
if pdu is None:
pdu = bytes([self.op_code]) + HCI_Object.dict_to_bytes(kwargs, self.fields)
self.pdu = pdu
def init_from_bytes(self, pdu, offset):
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def to_bytes(self):
return self.pdu
@property
def is_command(self):
return ((self.op_code >> 6) & 1) == 1
@property
def has_authentication_signature(self):
return ((self.op_code >> 7) & 1) == 1
def __bytes__(self):
return self.to_bytes()
def __str__(self):
result = color(self.name, 'yellow')
if fields := getattr(self, 'fields', None):
result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, ' ')
else:
if len(self.pdu) > 1:
result += f': {self.pdu.hex()}'
return result
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([
('request_opcode_in_error', {'size': 1, 'mapper': ATT_PDU.pdu_name}),
('attribute_handle_in_error', HANDLE_FIELD_SPEC),
('error_code', {'size': 1, 'mapper': ATT_PDU.error_name})
])
class ATT_Error_Response(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.1.1 Error Response
'''
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([
('client_rx_mtu', 2)
])
class ATT_Exchange_MTU_Request(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.2.1 Exchange MTU Request
'''
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([
('server_rx_mtu', 2)
])
class ATT_Exchange_MTU_Response(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.2.2 Exchange MTU Response
'''
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([
('starting_handle', HANDLE_FIELD_SPEC),
('ending_handle', HANDLE_FIELD_SPEC)
])
class ATT_Find_Information_Request(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.3.1 Find Information Request
'''
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([
('format', 1),
('information_data', '*')
])
class ATT_Find_Information_Response(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.3.2 Find Information Response
'''
def parse_information_data(self):
self.information = []
offset = 0
uuid_size = 2 if self.format == 1 else 16
while offset + uuid_size <= len(self.information_data):
handle = struct.unpack_from('<H', self.information_data, offset)[0]
uuid = self.information_data[2 + offset:2 + offset + uuid_size]
self.information.append((handle, uuid))
offset += 2 + uuid_size
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.parse_information_data()
def init_from_bytes(self, pdu, offset):
super().init_from_bytes(pdu, offset)
self.parse_information_data()
def __str__(self):
result = color(self.name, 'yellow')
result += ':\n' + HCI_Object.format_fields(self.__dict__, [
('format', 1),
('information', {'mapper': lambda x: ', '.join([f'0x{handle:04X}:{uuid.hex()}' for handle, uuid in x])})
], ' ')
return result
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([
('starting_handle', HANDLE_FIELD_SPEC),
('ending_handle', HANDLE_FIELD_SPEC),
('attribute_type', UUID_2_FIELD_SPEC),
('attribute_value', '*')
])
class ATT_Find_By_Type_Value_Request(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.3.3 Find By Type Value Request
'''
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([
('handles_information_list', '*')
])
class ATT_Find_By_Type_Value_Response(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.3.4 Find By Type Value Response
'''
def parse_handles_information_list(self):
self.handles_information = []
offset = 0
while offset + 4 <= len(self.handles_information_list):
found_attribute_handle, group_end_handle = struct.unpack_from('<HH', self.handles_information_list, offset)
self.handles_information.append((found_attribute_handle, group_end_handle))
offset += 4
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.parse_handles_information_list()
def init_from_bytes(self, pdu, offset):
super().init_from_bytes(pdu, offset)
self.parse_handles_information_list()
def __str__(self):
result = color(self.name, 'yellow')
result += ':\n' + HCI_Object.format_fields(self.__dict__, [
('handles_information', {'mapper': lambda x: ', '.join([f'0x{handle1:04X}-0x{handle2:04X}' for handle1, handle2 in x])})
], ' ')
return result
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([
('starting_handle', HANDLE_FIELD_SPEC),
('ending_handle', HANDLE_FIELD_SPEC),
('attribute_type', UUID_2_16_FIELD_SPEC)
])
class ATT_Read_By_Type_Request(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.4.1 Read By Type Request
'''
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([
('length', 1),
('attribute_data_list', '*')
])
class ATT_Read_By_Type_Response(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.4.2 Read By Type Response
'''
def parse_attribute_data_list(self):
self.attributes = []
offset = 0
while self.length != 0 and offset + self.length <= len(self.attribute_data_list):
attribute_handle, = struct.unpack_from('<H', self.attribute_data_list, offset)
attribute_value = self.attribute_data_list[offset + 2:offset + self.length]
self.attributes.append((attribute_handle, attribute_value))
offset += self.length
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.parse_attribute_data_list()
def init_from_bytes(self, pdu, offset):
super().init_from_bytes(pdu, offset)
self.parse_attribute_data_list()
def __str__(self):
result = color(self.name, 'yellow')
result += ':\n' + HCI_Object.format_fields(self.__dict__, [
('length', 1),
('attributes', {'mapper': lambda x: ', '.join([f'0x{handle:04X}:{value.hex()}' for handle, value in x])})
], ' ')
return result
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([
('attribute_handle', HANDLE_FIELD_SPEC)
])
class ATT_Read_Request(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.4.3 Read Request
'''
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([
('attribute_value', '*')
])
class ATT_Read_Response(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.4.4 Read Response
'''
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([
('attribute_handle', HANDLE_FIELD_SPEC),
('value_offset', 2)
])
class ATT_Read_Blob_Request(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.4.5 Read Blob Request
'''
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([
('part_attribute_value', '*')
])
class ATT_Read_Blob_Response(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.4.6 Read Blob Response
'''
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([
('set_of_handles', '*')
])
class ATT_Read_Multiple_Request(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.4.7 Read Multiple Request
'''
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([
('set_of_values', '*')
])
class ATT_Read_Multiple_Response(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.4.8 Read Multiple Response
'''
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([
('starting_handle', HANDLE_FIELD_SPEC),
('ending_handle', HANDLE_FIELD_SPEC),
('attribute_group_type', UUID_2_16_FIELD_SPEC)
])
class ATT_Read_By_Group_Type_Request(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.4.9 Read by Group Type Request
'''
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([
('length', 1),
('attribute_data_list', '*')
])
class ATT_Read_By_Group_Type_Response(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.4.10 Read by Group Type Response
'''
def parse_attribute_data_list(self):
self.attributes = []
offset = 0
while self.length != 0 and offset + self.length <= len(self.attribute_data_list):
attribute_handle, end_group_handle = struct.unpack_from('<HH', self.attribute_data_list, offset)
attribute_value = self.attribute_data_list[offset + 4:offset + self.length]
self.attributes.append((attribute_handle, end_group_handle, attribute_value))
offset += self.length
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.parse_attribute_data_list()
def init_from_bytes(self, pdu, offset):
super().init_from_bytes(pdu, offset)
self.parse_attribute_data_list()
def __str__(self):
result = color(self.name, 'yellow')
result += ':\n' + HCI_Object.format_fields(self.__dict__, [
('length', 1),
('attributes', {'mapper': lambda x: ', '.join([f'0x{handle:04X}-0x{end:04X}:{value.hex()}' for handle, end, value in x])})
], ' ')
return result
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([
('attribute_handle', HANDLE_FIELD_SPEC),
('attribute_value', '*')
])
class ATT_Write_Request(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.5.1 Write Request
'''
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([])
class ATT_Write_Response(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.5.2 Write Response
'''
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([
('attribute_handle', HANDLE_FIELD_SPEC),
('attribute_value', '*')
])
class ATT_Write_Command(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.5.3 Write Command
'''
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([
('attribute_handle', HANDLE_FIELD_SPEC),
('attribute_value', '*')
# ('authentication_signature', 'TODO')
])
class ATT_Signed_Write_Command(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.5.4 Signed Write Command
'''
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([
('attribute_handle', HANDLE_FIELD_SPEC),
('value_offset', 2),
('part_attribute_value', '*')
])
class ATT_Prepare_Write_Request(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.6.1 Prepare Write Request
'''
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([
('attribute_handle', HANDLE_FIELD_SPEC),
('value_offset', 2),
('part_attribute_value', '*')
])
class ATT_Prepare_Write_Response(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.6.2 Prepare Write Response
'''
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([])
class ATT_Execute_Write_Request(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.6.3 Execute Write Request
'''
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([])
class ATT_Execute_Write_Response(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.6.4 Execute Write Response
'''
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([
('attribute_handle', HANDLE_FIELD_SPEC),
('attribute_value', '*')
])
class ATT_Handle_Value_Notification(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.7.1 Handle Value Notification
'''
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([
('attribute_handle', HANDLE_FIELD_SPEC),
('attribute_value', '*')
])
class ATT_Handle_Value_Indication(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.7.2 Handle Value Indication
'''
# -----------------------------------------------------------------------------
@ATT_PDU.subclass([])
class ATT_Handle_Value_Confirmation(ATT_PDU):
'''
See Bluetooth spec @ Vol 3, Part F - 3.4.7.3 Handle Value Confirmation
'''
# -----------------------------------------------------------------------------
class Attribute(EventEmitter):
# Permission flags
READABLE = 0x01
WRITEABLE = 0x02
READ_REQUIRES_ENCRYPTION = 0x04
WRITE_REQUIRES_ENCRYPTION = 0x08
READ_REQUIRES_AUTHENTICATION = 0x10
WRITE_REQUIRES_AUTHENTICATION = 0x20
READ_REQUIRES_AUTHORIZATION = 0x40
WRITE_REQUIRES_AUTHORIZATION = 0x80
def __init__(self, attribute_type, permissions, value = b''):
EventEmitter.__init__(self)
self.handle = 0
self.end_group_handle = 0
self.permissions = permissions
# Convert the type to a UUID object if it isn't already
if type(attribute_type) is str:
self.type = UUID(attribute_type)
elif type(attribute_type) is bytes:
self.type = UUID.from_bytes(attribute_type)
else:
self.type = attribute_type
# Convert the value to a byte array
if type(value) is str:
self.value = bytes(value, 'utf-8')
else:
self.value = value
def read_value(self, connection):
if read := getattr(self.value, 'read', None):
try:
return read(connection)
except ATT_Error as error:
raise ATT_Error(error_code=error.error_code, att_handle=self.handle)
else:
return self.value
def write_value(self, connection, value):
if write := getattr(self.value, 'write', None):
try:
write(connection, value)
except ATT_Error as error:
raise ATT_Error(error_code=error.error_code, att_handle=self.handle)
else:
self.value = value
self.emit('write', connection, value)
def __repr__(self):
if len(self.value) > 0:
value_string = f', value={self.value.hex()}'
else:
value_string = ''
return f'Attribute(handle=0x{self.handle:04X}, type={self.type}, permissions={self.permissions}{value_string})'