blob: c4da67e63a53d8d918c110571c1dab2170cf2fd0 [file] [log] [blame]
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from bumble.hci import *
# -----------------------------------------------------------------------------
def basic_check(x):
packet = x.to_bytes()
print(packet.hex())
parsed = HCI_Packet.from_bytes(packet)
x_str = str(x)
parsed_str = str(parsed)
print(x_str)
parsed_bytes = parsed.to_bytes()
assert(x_str == parsed_str)
assert(packet == parsed_bytes)
# -----------------------------------------------------------------------------
def test_HCI_Event():
event = HCI_Event(0xF9)
basic_check(event)
event = HCI_Event(0xF8, bytes.fromhex('AABBCC'))
basic_check(event)
# -----------------------------------------------------------------------------
def test_HCI_LE_Connection_Complete_Event():
address = Address('00:11:22:33:44:55')
event = HCI_LE_Connection_Complete_Event(
status = HCI_SUCCESS,
connection_handle = 1,
role = 1,
peer_address_type = 1,
peer_address = address,
conn_interval = 3,
conn_latency = 4,
supervision_timeout = 5,
master_clock_accuracy = 6
)
basic_check(event)
# -----------------------------------------------------------------------------
def test_HCI_LE_Advertising_Report_Event():
address = Address('00:11:22:33:44:55')
report = HCI_Object(
HCI_LE_Advertising_Report_Event.REPORT_FIELDS,
event_type = HCI_LE_Advertising_Report_Event.ADV_IND,
address_type = Address.PUBLIC_DEVICE_ADDRESS,
address = address,
data = bytes.fromhex('0201061106ba5689a6fabfa2bd01467d6e00fbabad08160a181604659b03'),
rssi = 100
)
event = HCI_LE_Advertising_Report_Event([report])
basic_check(event)
# -----------------------------------------------------------------------------
def test_HCI_LE_Read_Remote_Features_Complete_Event():
event = HCI_LE_Read_Remote_Features_Complete_Event(
status = HCI_SUCCESS,
connection_handle = 0x007,
le_features = bytes.fromhex('0011223344556677')
)
basic_check(event)
# -----------------------------------------------------------------------------
def test_HCI_LE_Connection_Update_Complete_Event():
event = HCI_LE_Connection_Update_Complete_Event(
status = HCI_SUCCESS,
connection_handle = 0x007,
conn_interval = 10,
conn_latency = 3,
supervision_timeout = 5
)
basic_check(event)
# -----------------------------------------------------------------------------
def test_HCI_LE_Channel_Selection_Algorithm_Event():
event = HCI_LE_Channel_Selection_Algorithm_Event(
connection_handle = 7,
channel_selection_algorithm = 1
)
basic_check(event)
# -----------------------------------------------------------------------------
def test_HCI_Command_Complete_Event():
# With a serializable object
event = HCI_Command_Complete_Event(
num_hci_command_packets = 34,
command_opcode = HCI_LE_READ_BUFFER_SIZE_COMMAND,
return_parameters = HCI_LE_Read_Buffer_Size_Command.create_return_parameters(
status = 0,
hc_le_acl_data_packet_length = 1234,
hc_total_num_le_acl_data_packets = 56
)
)
basic_check(event)
# With an arbitrary byte array
event = HCI_Command_Complete_Event(
num_hci_command_packets = 1,
command_opcode = HCI_RESET_COMMAND,
return_parameters = bytes([1, 2, 3, 4])
)
basic_check(event)
# With a simple status as a 1-byte array
event = HCI_Command_Complete_Event(
num_hci_command_packets = 1,
command_opcode = HCI_RESET_COMMAND,
return_parameters = bytes([7])
)
basic_check(event)
event = HCI_Packet.from_bytes(event.to_bytes())
assert(event.return_parameters == 7)
# With a simple status as an integer status
event = HCI_Command_Complete_Event(
num_hci_command_packets = 1,
command_opcode = HCI_RESET_COMMAND,
return_parameters = 9
)
basic_check(event)
assert(event.return_parameters == 9)
# -----------------------------------------------------------------------------
def test_HCI_Command_Status_Event():
event = HCI_Command_Status_Event(
status = 0,
num_hci_command_packets = 37,
command_opcode = HCI_DISCONNECT_COMMAND
)
basic_check(event)
# -----------------------------------------------------------------------------
def test_HCI_Number_Of_Completed_Packets_Event():
event = HCI_Number_Of_Completed_Packets_Event([
(1, 2),
(3, 4)
])
basic_check(event)
# -----------------------------------------------------------------------------
def test_HCI_Command():
command = HCI_Command(0x5566)
basic_check(command)
command = HCI_Command(0x5566, bytes.fromhex('AABBCC'))
basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_Reset_Command():
command = HCI_Reset_Command()
basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_Read_Local_Version_Information_Command():
command = HCI_Read_Local_Version_Information_Command()
basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_Read_Local_Supported_Commands_Command():
command = HCI_Read_Local_Supported_Commands_Command()
basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_Read_Local_Supported_Features_Command():
command = HCI_Read_Local_Supported_Features_Command()
basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_Disconnect_Command():
command = HCI_Disconnect_Command(
connection_handle = 123,
reason = 0x11
)
basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_Set_Event_Mask_Command():
command = HCI_Set_Event_Mask_Command(
event_mask = bytes.fromhex('0011223344556677')
)
basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_LE_Set_Event_Mask_Command():
command = HCI_LE_Set_Event_Mask_Command(
le_event_mask = bytes.fromhex('0011223344556677')
)
basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_LE_Set_Random_Address_Command():
command = HCI_LE_Set_Random_Address_Command(
random_address = Address('00:11:22:33:44:55')
)
basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_LE_Set_Advertising_Parameters_Command():
command = HCI_LE_Set_Advertising_Parameters_Command(
advertising_interval_min = 20,
advertising_interval_max = 30,
advertising_type = HCI_LE_Set_Advertising_Parameters_Command.ADV_NONCONN_IND,
own_address_type = Address.PUBLIC_DEVICE_ADDRESS,
peer_address_type = Address.RANDOM_DEVICE_ADDRESS,
peer_address = Address('00:11:22:33:44:55'),
advertising_channel_map = 0x03,
advertising_filter_policy = 1
)
basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_LE_Set_Advertising_Data_Command():
command = HCI_LE_Set_Advertising_Data_Command(
advertising_data = bytes.fromhex('AABBCC')
)
basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_LE_Set_Scan_Parameters_Command():
command = HCI_LE_Set_Scan_Parameters_Command(
le_scan_type = 1,
le_scan_interval = 20,
le_scan_window = 10,
own_address_type = 1,
scanning_filter_policy = 0
)
basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_LE_Set_Scan_Enable_Command():
command = HCI_LE_Set_Scan_Enable_Command(
le_scan_enable = 1,
filter_duplicates = 0
)
basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_LE_Create_Connection_Command():
command = HCI_LE_Create_Connection_Command(
le_scan_interval = 4,
le_scan_window = 5,
initiator_filter_policy = 1,
peer_address_type = 1,
peer_address = Address('00:11:22:33:44:55'),
own_address_type = 2,
conn_interval_min = 7,
conn_interval_max = 8,
conn_latency = 9,
supervision_timeout = 10,
minimum_ce_length = 11,
maximum_ce_length = 12
)
basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_LE_Add_Device_To_White_List_Command():
command = HCI_LE_Add_Device_To_White_List_Command(
address_type = 1,
address = Address('00:11:22:33:44:55')
)
basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_LE_Remove_Device_From_White_List_Command():
command = HCI_LE_Remove_Device_From_White_List_Command(
address_type = 1,
address = Address('00:11:22:33:44:55')
)
basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_LE_Connection_Update_Command():
command = HCI_LE_Connection_Update_Command(
connection_handle = 0x0002,
conn_interval_min = 10,
conn_interval_max = 20,
conn_latency = 7,
supervision_timeout = 3,
minimum_ce_length = 100,
maximum_ce_length = 200
)
basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_LE_Read_Remote_Features_Command():
command = HCI_LE_Read_Remote_Features_Command(
connection_handle = 0x0002
)
basic_check(command)
# -----------------------------------------------------------------------------
def test_HCI_LE_Set_Default_PHY_Command():
command = HCI_LE_Set_Default_PHY_Command(
all_phys = 0,
tx_phys = 1,
rx_phys = 1
)
basic_check(command)
# -----------------------------------------------------------------------------
def test_address():
a = Address('C4:F2:17:1A:1D:BB')
assert(not a.is_public)
assert(a.is_random)
assert(a.address_type == Address.RANDOM_DEVICE_ADDRESS)
assert(not a.is_resolvable)
assert(not a.is_resolved)
assert(a.is_static)
# -----------------------------------------------------------------------------
def test_custom():
data = bytes([0x77, 0x02, 0x01, 0x03])
packet = HCI_CustomPacket(data)
assert(packet.hci_packet_type == 0x77)
assert(packet.payload == data)
# -----------------------------------------------------------------------------
def run_test_events():
test_HCI_Event()
test_HCI_LE_Connection_Complete_Event()
test_HCI_LE_Advertising_Report_Event()
test_HCI_LE_Connection_Update_Complete_Event()
test_HCI_LE_Read_Remote_Features_Complete_Event()
test_HCI_LE_Channel_Selection_Algorithm_Event()
test_HCI_Command_Complete_Event()
test_HCI_Command_Status_Event()
test_HCI_Number_Of_Completed_Packets_Event()
# -----------------------------------------------------------------------------
def run_test_commands():
test_HCI_Command()
test_HCI_Reset_Command()
test_HCI_Read_Local_Version_Information_Command()
test_HCI_Read_Local_Supported_Commands_Command()
test_HCI_Read_Local_Supported_Features_Command()
test_HCI_Disconnect_Command()
test_HCI_Set_Event_Mask_Command()
test_HCI_LE_Set_Event_Mask_Command()
test_HCI_LE_Set_Random_Address_Command()
test_HCI_LE_Set_Advertising_Parameters_Command()
test_HCI_LE_Set_Advertising_Data_Command()
test_HCI_LE_Set_Scan_Parameters_Command()
test_HCI_LE_Set_Scan_Enable_Command()
test_HCI_LE_Create_Connection_Command()
test_HCI_LE_Add_Device_To_White_List_Command()
test_HCI_LE_Remove_Device_From_White_List_Command()
test_HCI_LE_Connection_Update_Command()
test_HCI_LE_Read_Remote_Features_Command()
test_HCI_LE_Set_Default_PHY_Command()
# -----------------------------------------------------------------------------
if __name__ == '__main__':
run_test_events()
run_test_commands()
test_address()
test_custom()