blob: dbb2b7a6014dd6e50bd2de16079ed3c492a667e2 [file] [log] [blame]
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# SMP - Security Manager Protocol
#
# See Bluetooth spec @ Vol 3, Part H
#
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import logging
import asyncio
import secrets
from pyee import EventEmitter
from colors import color
from .core import *
from .hci import *
from .keys import PairingKeys
from . import crypto
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
SMP_CID = 0x06
SMP_PAIRING_REQUEST_COMMAND = 0x01
SMP_PAIRING_RESPONSE_COMMAND = 0x02
SMP_PAIRING_CONFIRM_COMMAND = 0x03
SMP_PAIRING_RANDOM_COMMAND = 0x04
SMP_PAIRING_FAILED_COMMAND = 0x05
SMP_ENCRYPTION_INFORMATION_COMMAND = 0x06
SMP_MASTER_IDENTIFICATION_COMMAND = 0x07
SMP_IDENTITY_INFORMATION_COMMAND = 0x08
SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND = 0x09
SMP_SIGNING_INFORMATION_COMMAND = 0x0A
SMP_SECURITY_REQUEST_COMMAND = 0x0B
SMP_PAIRING_PUBLIC_KEY_COMMAND = 0x0C
SMP_PAIRING_DHKEY_CHECK_COMMAND = 0x0D
SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND = 0x0E
SMP_COMMAND_NAMES = {
SMP_PAIRING_REQUEST_COMMAND: 'SMP_PAIRING_REQUEST_COMMAND',
SMP_PAIRING_RESPONSE_COMMAND: 'SMP_PAIRING_RESPONSE_COMMAND',
SMP_PAIRING_CONFIRM_COMMAND: 'SMP_PAIRING_CONFIRM_COMMAND',
SMP_PAIRING_RANDOM_COMMAND: 'SMP_PAIRING_RANDOM_COMMAND',
SMP_PAIRING_FAILED_COMMAND: 'SMP_PAIRING_FAILED_COMMAND',
SMP_ENCRYPTION_INFORMATION_COMMAND: 'SMP_ENCRYPTION_INFORMATION_COMMAND',
SMP_MASTER_IDENTIFICATION_COMMAND: 'SMP_MASTER_IDENTIFICATION_COMMAND',
SMP_IDENTITY_INFORMATION_COMMAND: 'SMP_IDENTITY_INFORMATION_COMMAND',
SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND: 'SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND',
SMP_SIGNING_INFORMATION_COMMAND: 'SMP_SIGNING_INFORMATION_COMMAND',
SMP_SECURITY_REQUEST_COMMAND: 'SMP_SECURITY_REQUEST_COMMAND',
SMP_PAIRING_PUBLIC_KEY_COMMAND: 'SMP_PAIRING_PUBLIC_KEY_COMMAND',
SMP_PAIRING_DHKEY_CHECK_COMMAND: 'SMP_PAIRING_DHKEY_CHECK_COMMAND',
SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND: 'SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND'
}
SMP_DISPLAY_ONLY_IO_CAPABILITY = 0x00
SMP_DISPLAY_YES_NO_IO_CAPABILITY = 0x01
SMP_KEYBOARD_ONLY_IO_CAPABILITY = 0x02
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY = 0x03
SMP_KEYBOARD_DISPLAY_IO_CAPABILITY = 0x04
SMP_IO_CAPABILITY_NAMES = {
SMP_DISPLAY_ONLY_IO_CAPABILITY: 'SMP_DISPLAY_ONLY_IO_CAPABILITY',
SMP_DISPLAY_YES_NO_IO_CAPABILITY: 'SMP_DISPLAY_YES_NO_IO_CAPABILITY',
SMP_KEYBOARD_ONLY_IO_CAPABILITY: 'SMP_KEYBOARD_ONLY_IO_CAPABILITY',
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: 'SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY',
SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: 'SMP_KEYBOARD_DISPLAY_IO_CAPABILITY'
}
SMP_PASSKEY_ENTRY_FAILED_ERROR = 0x01
SMP_OOB_NOT_AVAILABLE_ERROR = 0x02
SMP_AUTHENTICATION_REQUIREMENTS_ERROR = 0x03
SMP_CONFIRM_VALUE_FAILED_ERROR = 0x04
SMP_PAIRING_NOT_SUPPORTED_ERROR = 0x05
SMP_ENCRYPTION_KEY_SIZE_ERROR = 0x06
SMP_COMMAND_NOT_SUPPORTED_ERROR = 0x07
SMP_UNSPECIFIED_REASON_ERROR = 0x08
SMP_REPEATED_ATTEMPTS_ERROR = 0x09
SMP_INVALID_PARAMETERS_ERROR = 0x0A
SMP_DHKEY_CHECK_FAILED_ERROR = 0x0B
SMP_NUMERIC_COMPARISON_FAILED_ERROR = 0x0C
SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR = 0x0D
SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR = 0x0E
SMP_ERROR_NAMES = {
SMP_PASSKEY_ENTRY_FAILED_ERROR: 'SMP_PASSKEY_ENTRY_FAILED_ERROR',
SMP_OOB_NOT_AVAILABLE_ERROR: 'SMP_OOB_NOT_AVAILABLE_ERROR',
SMP_AUTHENTICATION_REQUIREMENTS_ERROR: 'SMP_AUTHENTICATION_REQUIREMENTS_ERROR',
SMP_CONFIRM_VALUE_FAILED_ERROR: 'SMP_CONFIRM_VALUE_FAILED_ERROR',
SMP_PAIRING_NOT_SUPPORTED_ERROR: 'SMP_PAIRING_NOT_SUPPORTED_ERROR',
SMP_ENCRYPTION_KEY_SIZE_ERROR: 'SMP_ENCRYPTION_KEY_SIZE_ERROR',
SMP_COMMAND_NOT_SUPPORTED_ERROR: 'SMP_COMMAND_NOT_SUPPORTED_ERROR',
SMP_UNSPECIFIED_REASON_ERROR: 'SMP_UNSPECIFIED_REASON_ERROR',
SMP_REPEATED_ATTEMPTS_ERROR: 'SMP_REPEATED_ATTEMPTS_ERROR',
SMP_INVALID_PARAMETERS_ERROR: 'SMP_INVALID_PARAMETERS_ERROR',
SMP_DHKEY_CHECK_FAILED_ERROR: 'SMP_DHKEY_CHECK_FAILED_ERROR',
SMP_NUMERIC_COMPARISON_FAILED_ERROR: 'SMP_NUMERIC_COMPARISON_FAILED_ERROR',
SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR: 'SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR',
SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR: 'SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR'
}
SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE = 0
SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE = 1
SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE = 2
SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE = 3
SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE = 4
SMP_KEYPRESS_NOTIFICATION_TYPE_NAMES = {
SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE',
SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE',
SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE',
SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE',
SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE'
}
# Bit flags for key distribution/generation
SMP_ENC_KEY_DISTRIBUTION_FLAG = 0b0001
SMP_ID_KEY_DISTRIBUTION_FLAG = 0b0010
SMP_SIGN_KEY_DISTRIBUTION_FLAG = 0b0100
SMP_LINK_KEY_DISTRIBUTION_FLAG = 0b1000
# AuthReq fields
SMP_BONDING_AUTHREQ = 0b00000001
SMP_MITM_AUTHREQ = 0b00000100
SMP_SC_AUTHREQ = 0b00001000
SMP_KEYPRESS_AUTHREQ = 0b00010000
SMP_CT2_AUTHREQ = 0b00100000
# Crypto salt
SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('00000000000000000000000000000000746D7031')
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------
def error_name(error_code):
return name_or_number(SMP_ERROR_NAMES, error_code)
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
class SMP_Command:
'''
See Bluetooth spec @ Vol 3, Part H - 3 SECURITY MANAGER PROTOCOL
'''
smp_classes = {}
code = 0
@staticmethod
def from_bytes(pdu):
code = pdu[0]
cls = SMP_Command.smp_classes.get(code)
if cls is None:
instance = SMP_Command(pdu)
instance.name = SMP_Command.command_name(code)
instance.code = code
return instance
self = cls.__new__(cls)
SMP_Command.__init__(self, pdu)
if hasattr(self, 'fields'):
self.init_from_bytes(pdu, 1)
return self
@staticmethod
def command_name(code):
return name_or_number(SMP_COMMAND_NAMES, code)
@staticmethod
def auth_req_str(value):
bonding_flags = value & 3
mitm = (value >> 2) & 1
sc = (value >> 3) & 1
keypress = (value >> 4) & 1
ct2 = (value >> 5) & 1
return f'bonding_flags={bonding_flags}, MITM={mitm}, sc={sc}, keypress={keypress}, ct2={ct2}'
@staticmethod
def io_capability_name(io_capability):
return name_or_number(SMP_IO_CAPABILITY_NAMES, io_capability)
@staticmethod
def key_distribution_str(value):
key_types = []
if value & SMP_ENC_KEY_DISTRIBUTION_FLAG:
key_types.append('ENC')
if value & SMP_ID_KEY_DISTRIBUTION_FLAG:
key_types.append('ID')
if value & SMP_SIGN_KEY_DISTRIBUTION_FLAG:
key_types.append('SIGN')
if value & SMP_LINK_KEY_DISTRIBUTION_FLAG:
key_types.append('LINK')
return ','.join(key_types)
@staticmethod
def keypress_notification_type_name(notification_type):
return name_or_number(SMP_KEYPRESS_NOTIFICATION_TYPE_NAMES, notification_type)
@staticmethod
def subclass(fields):
def inner(cls):
cls.name = cls.__name__.upper()
cls.code = key_with_value(SMP_COMMAND_NAMES, cls.name)
if cls.code is None:
raise KeyError(f'Command name {cls.name} not found in SMP_COMMAND_NAMES')
cls.fields = fields
# Register a factory for this class
SMP_Command.smp_classes[cls.code] = cls
return cls
return inner
def __init__(self, pdu=None, **kwargs):
if hasattr(self, 'fields') and kwargs:
HCI_Object.init_from_fields(self, self.fields, kwargs)
if pdu is None:
pdu = bytes([self.code]) + HCI_Object.dict_to_bytes(kwargs, self.fields)
self.pdu = pdu
def init_from_bytes(self, pdu, offset):
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def to_bytes(self):
return self.pdu
def __bytes__(self):
return self.to_bytes()
def __str__(self):
result = color(self.name, 'yellow')
if fields := getattr(self, 'fields', None):
result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, ' ')
else:
if len(self.pdu) > 1:
result += f': {self.pdu.hex()}'
return result
# -----------------------------------------------------------------------------
@SMP_Command.subclass([
('io_capability', {'size': 1, 'mapper': SMP_Command.io_capability_name}),
('oob_data_flag', 1),
('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}),
('maximum_encryption_key_size', 1),
('initiator_key_distribution', {'size': 1, 'mapper': SMP_Command.key_distribution_str}),
('responder_key_distribution', {'size': 1, 'mapper': SMP_Command.key_distribution_str})
])
class SMP_Pairing_Request_Command(SMP_Command):
'''
See Bluetooth spec @ Vol 3, Part H - 3.5.1 Pairing Request
'''
# -----------------------------------------------------------------------------
@SMP_Command.subclass([
('io_capability', {'size': 1, 'mapper': SMP_Command.io_capability_name}),
('oob_data_flag', 1),
('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}),
('maximum_encryption_key_size', 1),
('initiator_key_distribution', {'size': 1, 'mapper': SMP_Command.key_distribution_str}),
('responder_key_distribution', {'size': 1, 'mapper': SMP_Command.key_distribution_str})
])
class SMP_Pairing_Response_Command(SMP_Command):
'''
See Bluetooth spec @ Vol 3, Part H - 3.5.2 Pairing Response
'''
# -----------------------------------------------------------------------------
@SMP_Command.subclass([
('confirm_value', 16)
])
class SMP_Pairing_Confirm_Command(SMP_Command):
'''
See Bluetooth spec @ Vol 3, Part H - 3.5.3 Pairing Confirm
'''
# -----------------------------------------------------------------------------
@SMP_Command.subclass([
('random_value', 16)
])
class SMP_Pairing_Random_Command(SMP_Command):
'''
See Bluetooth spec @ Vol 3, Part H - 3.5.4 Pairing Random
'''
# -----------------------------------------------------------------------------
@SMP_Command.subclass([
('reason', {'size': 1, 'mapper': error_name})
])
class SMP_Pairing_Failed_Command(SMP_Command):
'''
See Bluetooth spec @ Vol 3, Part H - 3.5.5 Pairing Failed
'''
# -----------------------------------------------------------------------------
@SMP_Command.subclass([
('public_key_x', 32),
('public_key_y', 32)
])
class SMP_Pairing_Public_Key_Command(SMP_Command):
'''
See Bluetooth spec @ Vol 3, Part H - 3.5.6 Pairing Public Key
'''
# -----------------------------------------------------------------------------
@SMP_Command.subclass([
('dhkey_check', 16),
])
class SMP_Pairing_DHKey_Check_Command(SMP_Command):
'''
See Bluetooth spec @ Vol 3, Part H - 3.5.7 Pairing DHKey Check
'''
# -----------------------------------------------------------------------------
@SMP_Command.subclass([
('notification_type', {'size': 1, 'mapper': SMP_Command.keypress_notification_type_name}),
])
class SMP_Pairing_Keypress_Notification_Command(SMP_Command):
'''
See Bluetooth spec @ Vol 3, Part H - 3.5.8 Keypress Notification
'''
# -----------------------------------------------------------------------------
@SMP_Command.subclass([
('long_term_key', 16)
])
class SMP_Encryption_Information_Command(SMP_Command):
'''
See Bluetooth spec @ Vol 3, Part H - 3.6.2 Encryption Information
'''
# -----------------------------------------------------------------------------
@SMP_Command.subclass([
('ediv', 2),
('rand', 8)
])
class SMP_Master_Identification_Command(SMP_Command):
'''
See Bluetooth spec @ Vol 3, Part H - 3.6.3 Master Identification
'''
# -----------------------------------------------------------------------------
@SMP_Command.subclass([
('identity_resolving_key', 16)
])
class SMP_Identity_Information_Command(SMP_Command):
'''
See Bluetooth spec @ Vol 3, Part H - 3.6.4 Identity Information
'''
# -----------------------------------------------------------------------------
@SMP_Command.subclass([
('addr_type', Address.ADDRESS_TYPE_SPEC),
('bd_addr', Address.parse_address_preceded_by_type)
])
class SMP_Identity_Address_Information_Command(SMP_Command):
'''
See Bluetooth spec @ Vol 3, Part H - 3.6.5 Identity Address Information
'''
# -----------------------------------------------------------------------------
@SMP_Command.subclass([
('signature_key', 16)
])
class SMP_Signing_Information_Command(SMP_Command):
'''
See Bluetooth spec @ Vol 3, Part H - 3.6.6 Signing Information
'''
# -----------------------------------------------------------------------------
@SMP_Command.subclass([
('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}),
])
class SMP_Security_Request_Command(SMP_Command):
'''
See Bluetooth spec @ Vol 3, Part H - 3.6.7 Security Request
'''
# -----------------------------------------------------------------------------
def smp_auth_req(bonding, mitm, sc, keypress, ct2):
value = 0
if bonding:
value |= SMP_BONDING_AUTHREQ
if mitm:
value |= SMP_MITM_AUTHREQ
if sc:
value |= SMP_SC_AUTHREQ
if keypress:
value |= SMP_KEYPRESS_AUTHREQ
if ct2:
value |= SMP_CT2_AUTHREQ
return value
# -----------------------------------------------------------------------------
class AddressResolver:
def __init__(self, resolving_keys):
self.resolving_keys = resolving_keys
def resolve(self, address):
address_bytes = bytes(address)
hash = address_bytes[0:3]
prand = address_bytes[3:6]
for (irk, resolved_address) in self.resolving_keys:
local_hash = crypto.ah(irk, prand)
if local_hash == hash:
# Match!
if resolved_address.address_type == Address.PUBLIC_DEVICE_ADDRESS:
resolved_address_type = Address.PUBLIC_IDENTITY_ADDRESS
else:
resolved_address_type = Address.RANDOM_IDENTITY_ADDRESS
return Address(address=str(resolved_address), address_type=resolved_address_type)
# -----------------------------------------------------------------------------
class PairingDelegate:
NO_OUTPUT_NO_INPUT = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
KEYBOARD_INPUT_ONLY = SMP_KEYBOARD_ONLY_IO_CAPABILITY
DISPLAY_OUTPUT_ONLY = SMP_DISPLAY_ONLY_IO_CAPABILITY
DISPLAY_OUTPUT_AND_YES_NO_INPUT = SMP_DISPLAY_YES_NO_IO_CAPABILITY
DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = SMP_KEYBOARD_DISPLAY_IO_CAPABILITY
DEFAULT_KEY_DISTRIBUTION = (SMP_ENC_KEY_DISTRIBUTION_FLAG | SMP_ID_KEY_DISTRIBUTION_FLAG)
def __init__(
self,
io_capability=NO_OUTPUT_NO_INPUT,
local_initiator_key_distribution=DEFAULT_KEY_DISTRIBUTION,
local_responder_key_distribution=DEFAULT_KEY_DISTRIBUTION
):
self.io_capability = io_capability
self.local_initiator_key_distribution = local_initiator_key_distribution
self.local_responder_key_distribution = local_responder_key_distribution
async def accept(self):
return True
async def compare_numbers(self, number, digits=6):
return True
async def get_number(self):
return 0
async def display_number(self, number, digits=6):
pass
async def key_distribution_response(self, peer_initiator_key_distribution, peer_responder_key_distribution):
return (
(peer_initiator_key_distribution &
self.local_initiator_key_distribution),
(peer_responder_key_distribution &
self.local_responder_key_distribution)
)
# -----------------------------------------------------------------------------
class PairingConfig:
def __init__(self, sc=True, mitm=True, bonding=True, delegate=None):
self.sc = sc
self.mitm = mitm
self.bonding = bonding
self.delegate = delegate or PairingDelegate()
def __str__(self):
io_capability_str = SMP_Command.io_capability_name(self.delegate.io_capability)
return f'PairingConfig(sc={self.sc}, mitm={self.mitm}, bonding={self.bonding}, delegate[{io_capability_str}])'
# -----------------------------------------------------------------------------
class Session:
# Pairing methods
JUST_WORKS = 0
NUMERIC_COMPARISON = 1
PASSKEY = 2
OOB = 3
PAIRING_METHOD_NAMES = {
JUST_WORKS: 'JUST_WORKS',
NUMERIC_COMPARISON: 'NUMERIC_COMPARISON',
PASSKEY: 'PASSKEY',
OOB: 'OOB'
}
# I/O Capability to pairing method decision matrix
#
# See Bluetooth spec @ Vol 3, part H - Table 2.8: Mapping of IO Capabilities to Key Generation Method
#
# Map: initiator -> responder -> <method>
# where <method> may be a simple entry or a 2-element tuple, with the first element for legacy
# pairing and the second for secure connections, when the two are different.
# Each entry is either a method name, or, for PASSKEY, a tuple:
# (method, initiator_displays, responder_displays)
# to specify if the initiator and responder should display (True) or input a code (False).
PAIRING_METHODS = {
SMP_DISPLAY_ONLY_IO_CAPABILITY: {
SMP_DISPLAY_ONLY_IO_CAPABILITY: JUST_WORKS,
SMP_DISPLAY_YES_NO_IO_CAPABILITY: JUST_WORKS,
SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PASSKEY, True, False),
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: JUST_WORKS,
SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: (PASSKEY, True, False),
},
SMP_DISPLAY_YES_NO_IO_CAPABILITY: {
SMP_DISPLAY_ONLY_IO_CAPABILITY: JUST_WORKS,
SMP_DISPLAY_YES_NO_IO_CAPABILITY: (JUST_WORKS, NUMERIC_COMPARISON),
SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PASSKEY, True, False),
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: JUST_WORKS,
SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: ((PASSKEY, True, False), NUMERIC_COMPARISON)
},
SMP_KEYBOARD_ONLY_IO_CAPABILITY: {
SMP_DISPLAY_ONLY_IO_CAPABILITY: (PASSKEY, False, True),
SMP_DISPLAY_YES_NO_IO_CAPABILITY: (PASSKEY, False, True),
SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PASSKEY, False, False),
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: JUST_WORKS,
SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: (PASSKEY, False, True),
},
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: {
SMP_DISPLAY_ONLY_IO_CAPABILITY: JUST_WORKS,
SMP_DISPLAY_YES_NO_IO_CAPABILITY: JUST_WORKS,
SMP_KEYBOARD_ONLY_IO_CAPABILITY: JUST_WORKS,
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: JUST_WORKS,
SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: JUST_WORKS
},
SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: {
SMP_DISPLAY_ONLY_IO_CAPABILITY: (PASSKEY, False, True),
SMP_DISPLAY_YES_NO_IO_CAPABILITY: ((PASSKEY, False, True), NUMERIC_COMPARISON),
SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PASSKEY, True, False),
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: JUST_WORKS,
SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: ((PASSKEY, True, False), NUMERIC_COMPARISON)
}
}
def __init__(self, manager, connection, pairing_config):
self.manager = manager
self.connection = connection
self.tk = bytes(16)
self.r = bytes(16)
self.stk = None
self.ltk = None
self.ltk_ediv = 0
self.ltk_rand = bytes(8)
self.link_key = None
self.initiator_key_distribution = 0
self.responder_key_distribution = 0
self.peer_random_value = None
self.peer_public_key_x = bytes(32)
self.peer_public_key_y = bytes(32)
self.peer_ltk = None
self.peer_ediv = None
self.peer_rand = None
self.peer_identity_resolving_key = None
self.peer_bd_addr = None
self.peer_signature_key = None
self.peer_expected_distributions = []
self.dh_key = None
self.passkey = 0
self.passkey_step = 0
self.passkey_display = False
self.pairing_method = 0
self.pairing_config = pairing_config
self.wait_before_continuing = None
self.completed = False
# Decide if we're the initiator or the responder
self.is_initiator = (connection.role == BT_CENTRAL_ROLE)
self.is_responder = not self.is_initiator
# Listen for connection events
connection.on('disconnection', self.on_disconnection)
connection.on('connection_encryption_change', self.on_connection_encryption_change)
connection.on('connection_encryption_key_refresh', self.on_connection_encryption_key_refresh)
# Create a future that can be used to wait for the session to complete
if self.is_initiator:
self.pairing_result = asyncio.get_running_loop().create_future()
else:
self.pairing_result = None
# Key Distribution (default values before negotiation)
self.initiator_key_distribution = pairing_config.delegate.local_initiator_key_distribution
self.responder_key_distribution = pairing_config.delegate.local_responder_key_distribution
# Authentication Requirements Flags - Vol 3, Part H, Figure 3.3
self.bonding = pairing_config.bonding
self.sc = pairing_config.sc
self.mitm = pairing_config.mitm
self.keypress = False
self.ct2 = False
# I/O Capabilities
self.io_capability = pairing_config.delegate.io_capability
self.peer_io_capability = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
# OOB (not supported yet)
self.oob = False
# Set up addresses
peer_address = connection.peer_resolvable_address or connection.peer_address
if self.is_initiator:
self.ia = bytes(manager.address)
self.iat = 1 if manager.address.is_random else 0
self.ra = bytes(peer_address)
self.rat = 1 if peer_address.is_random else 0
else:
self.ra = bytes(manager.address)
self.rat = 1 if manager.address.is_random else 0
self.ia = bytes(peer_address)
self.iat = 1 if peer_address.is_random else 0
@property
def pkx(self):
return (
bytes(reversed(self.manager.ecc_key.x)),
self.peer_public_key_x
)
@property
def pka(self):
return self.pkx[0 if self.is_initiator else 1]
@property
def pkb(self):
return self.pkx[0 if self.is_responder else 1]
@property
def nx(self):
return (
self.r,
self.peer_random_value
)
@property
def na(self):
return self.nx[0 if self.is_initiator else 1]
@property
def nb(self):
return self.nx[0 if self.is_responder else 1]
@property
def auth_req(self):
return smp_auth_req(self.bonding, self.mitm, self.sc, self.keypress, self.ct2)
def get_long_term_key(self, rand, ediv):
if not self.sc and not self.completed:
if rand == self.ltk_rand and ediv == self.ltk_ediv:
return self.stk
else:
return self.ltk
def decide_pairing_method(self, auth_req, initiator_io_capability, responder_io_capability):
if (not self.mitm) and (auth_req & SMP_MITM_AUTHREQ == 0):
self.pairing_method = self.JUST_WORKS
return
details = self.PAIRING_METHODS[initiator_io_capability][responder_io_capability]
if type(details) is tuple and len(details) == 2:
# One entry for legacy pairing and one for secure connections
details = details[1 if self.sc else 0]
if type(details) is int:
# Just a method ID
self.pairing_method = details
else:
# PASSKEY method, with a method ID and display/input flags
self.pairing_method = details[0]
self.passkey_display = details[1 if self.is_initiator else 2]
def check_expected_value(self, expected, received, error):
logger.debug(f'expected={expected.hex()} got={received.hex()}')
if expected != received:
logger.info(color('pairing confirm/check mismatch', 'red'))
self.send_pairing_failed(error)
return False
return True
def prompt_user_for_numeric_comparison(self, code, next_steps):
async def prompt():
logger.debug(f'verification code: {code}')
try:
response = await self.pairing_config.delegate.compare_numbers(code, digits=6)
if response:
next_steps()
return
except Exception as error:
logger.warn(f'exception while prompting: {error}')
self.send_pairing_failed(SMP_CONFIRM_VALUE_FAILED_ERROR)
asyncio.create_task(prompt())
def prompt_user_for_number(self, next_steps):
async def prompt():
logger.debug('prompting user for passkey')
try:
passkey = await self.pairing_config.delegate.get_number()
logger.debug(f'user input: {passkey}')
next_steps(passkey)
except Exception as error:
logger.warn(f'exception while prompting: {error}')
self.send_pairing_failed(SMP_PASSKEY_ENTRY_FAILED_ERROR)
asyncio.create_task(prompt())
def display_passkey(self):
# Generate random Passkey/PIN code
self.passkey = secrets.randbelow(1000000)
logger.debug(f'Pairing PIN CODE: {self.passkey:06}')
# The value of TK is computed from the PIN code
if not self.sc:
self.tk = self.passkey.to_bytes(16, byteorder='little')
logger.debug(f'TK from passkey = {self.tk.hex()}')
asyncio.create_task(self.pairing_config.delegate.display_number(self.passkey, digits=6))
def input_passkey(self, next_steps=None):
# Prompt the user for the passkey displayed on the peer
def after_input(passkey):
self.passkey = passkey
if not self.sc:
self.tk = passkey.to_bytes(16, byteorder='little')
logger.debug(f'TK from passkey = {self.tk.hex()}')
if next_steps is not None:
next_steps()
self.prompt_user_for_number(after_input)
def display_or_input_passkey(self, next_steps=None):
if self.passkey_display:
self.display_passkey()
if next_steps is not None:
next_steps()
else:
self.input_passkey(next_steps)
def send_command(self, command):
self.manager.send_command(self.connection, command)
def send_pairing_failed(self, error):
self.send_command(SMP_Pairing_Failed_Command(reason = error))
self.on_pairing_failure(error)
def send_pairing_request_command(self):
self.manager.on_session_start(self)
command = SMP_Pairing_Request_Command(
io_capability = self.io_capability,
oob_data_flag = 0,
auth_req = self.auth_req,
maximum_encryption_key_size = 16,
initiator_key_distribution = self.initiator_key_distribution,
responder_key_distribution = self.responder_key_distribution
)
self.preq = bytes(command)
self.send_command(command)
def send_pairing_response_command(self):
response = SMP_Pairing_Response_Command(
io_capability = self.io_capability,
oob_data_flag = 0,
auth_req = self.auth_req,
maximum_encryption_key_size = 16,
initiator_key_distribution = self.initiator_key_distribution,
responder_key_distribution = self.responder_key_distribution
)
self.pres = bytes(response)
self.send_command(response)
def send_pairing_confirm_command(self):
self.r = crypto.r()
logger.debug(f'generated random: {self.r.hex()}')
if self.sc:
if self.pairing_method == self.JUST_WORKS or self.pairing_method == self.NUMERIC_COMPARISON:
z = 0
elif self.pairing_method == self.PASSKEY:
z = 0x80 + ((self.passkey >> self.passkey_step) & 1)
else:
return
if self.is_initiator:
confirm_value = crypto.f4(
self.pka,
self.pkb,
self.r,
bytes([z])
)
else:
confirm_value = crypto.f4(
self.pkb,
self.pka,
self.r,
bytes([z])
)
else:
confirm_value = crypto.c1(
self.tk,
self.r,
self.preq,
self.pres,
self.iat,
self.rat,
self.ia,
self.ra
)
self.send_command(SMP_Pairing_Confirm_Command(confirm_value = confirm_value))
def send_pairing_random_command(self):
self.send_command(SMP_Pairing_Random_Command(random_value = self.r))
def send_public_key_command(self):
self.send_command(
SMP_Pairing_Public_Key_Command(
public_key_x = bytes(reversed(self.manager.ecc_key.x)),
public_key_y = bytes(reversed(self.manager.ecc_key.y))
)
)
def send_pairing_dhkey_check_command(self):
self.send_command(
SMP_Pairing_DHKey_Check_Command(
dhkey_check = self.ea if self.is_initiator else self.eb
)
)
def start_encryption(self, key):
# We can now encrypt the connection with the short term key, so that we can
# distribute the long term and/or other keys over an encrypted connection
asyncio.create_task(
self.manager.device.host.send_command(
HCI_LE_Start_Encryption_Command(
connection_handle = self.connection.handle,
random_number = bytes(8),
encrypted_diversifier = 0,
long_term_key = key
)
)
)
def distribute_keys(self):
# Distribute the keys as required
if self.is_initiator:
if not self.sc:
# Distribute the LTK, EDIV and RAND
if self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG:
self.send_command(SMP_Encryption_Information_Command(long_term_key=self.ltk))
self.send_command(SMP_Master_Identification_Command(ediv=self.ltk_ediv, rand=self.ltk_rand))
# Distribute IRK & BD ADDR
if self.initiator_key_distribution & SMP_ID_KEY_DISTRIBUTION_FLAG:
self.send_command(
SMP_Identity_Information_Command(identity_resolving_key=self.manager.device.irk)
)
self.send_command(SMP_Identity_Address_Information_Command(
addr_type = self.manager.address.address_type,
bd_addr = self.manager.address
))
# Distribute CSRK
csrk = bytes(16) # FIXME: testing
if self.initiator_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG:
self.send_command(SMP_Signing_Information_Command(signature_key=csrk))
# CTKD, calculate BR/EDR link key
if self.initiator_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
ilk = crypto.h7(
salt=SMP_CTKD_H7_LEBR_SALT,
w=self.ltk) if self.ct2 else crypto.h6(self.ltk, b'tmp1')
self.link_key = crypto.h6(ilk, b'lebr')
else:
# Distribute the LTK, EDIV and RAND
if not self.sc:
if self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG:
self.send_command(SMP_Encryption_Information_Command(long_term_key=self.ltk))
self.send_command(SMP_Master_Identification_Command(ediv=self.ltk_ediv, rand=self.ltk_rand))
# Distribute IRK & BD ADDR
if self.responder_key_distribution & SMP_ID_KEY_DISTRIBUTION_FLAG:
self.send_command(
SMP_Identity_Information_Command(identity_resolving_key=self.manager.device.irk)
)
self.send_command(SMP_Identity_Address_Information_Command(
addr_type = self.manager.address.address_type,
bd_addr = self.manager.address
))
# Distribute CSRK
csrk = bytes(16) # FIXME: testing
if self.responder_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG:
self.send_command(SMP_Signing_Information_Command(signature_key=csrk))
# CTKD, calculate BR/EDR link key
if self.responder_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
ilk = crypto.h7(
salt=SMP_CTKD_H7_LEBR_SALT,
w=self.ltk) if self.ct2 else crypto.h6(self.ltk, b'tmp1')
self.link_key = crypto.h6(ilk, b'lebr')
def compute_peer_expected_distributions(self, key_distribution_flags):
# Set our expectations for what to wait for in the key distribution phase
self.peer_expected_distributions = []
if not self.sc:
if (key_distribution_flags & SMP_ENC_KEY_DISTRIBUTION_FLAG != 0):
self.peer_expected_distributions.append(SMP_Encryption_Information_Command)
self.peer_expected_distributions.append(SMP_Master_Identification_Command)
if (key_distribution_flags & SMP_ID_KEY_DISTRIBUTION_FLAG != 0):
self.peer_expected_distributions.append(SMP_Identity_Information_Command)
self.peer_expected_distributions.append(SMP_Identity_Address_Information_Command)
if (key_distribution_flags & SMP_SIGN_KEY_DISTRIBUTION_FLAG != 0):
self.peer_expected_distributions.append(SMP_Signing_Information_Command)
logger.debug(f'expecting distributions: {[c.__name__ for c in self.peer_expected_distributions]}')
def check_key_distribution(self, command_class):
# First, check that the connection is encrypted
if not self.connection.is_encrypted:
logger.warn(color('received key distribution on a non-encrypted connection', 'red'))
self.send_pairing_failed(SMP_UNSPECIFIED_REASON_ERROR)
return
# Check that this command class is expected
if command_class in self.peer_expected_distributions:
self.peer_expected_distributions.remove(command_class)
logger.debug(f'remaining distributions: {[c.__name__ for c in self.peer_expected_distributions]}')
if not self.peer_expected_distributions:
# The initiator can now send its keys
if self.is_initiator:
self.distribute_keys()
# Nothing left to expect, we're done
self.on_pairing()
else:
logger.warn(color(f'!!! unexpected key distribution command: {command_class.__name__}', 'red'))
self.send_pairing_failed(SMP_UNSPECIFIED_REASON_ERROR)
async def pair(self):
# Start pairing as an initiator
# TODO: check that this session isn't already active
# Send the pairing request to start the process
self.send_pairing_request_command()
# Wait for the pairing process to finish
await self.pairing_result
def on_disconnection(self, reason):
self.connection.remove_listener('disconnection', self.on_disconnection)
self.connection.remove_listener('connection_encryption_change', self.on_connection_encryption_change)
self.connection.remove_listener('connection_encryption_key_refresh', self.on_connection_encryption_key_refresh)
self.manager.on_session_end(self)
def on_connection_encryption_change(self):
if self.connection.is_encrypted:
if self.is_responder:
# The responder distributes its keys first, the initiator later
self.distribute_keys()
def on_connection_encryption_key_refresh(self):
# Do as if the connection had just been encrypted
self.on_connection_encryption_change()
def on_pairing(self):
logger.debug('pairing complete')
if self.completed:
return
else:
self.completed = True
if self.pairing_result is not None and not self.pairing_result.done():
self.pairing_result.set_result(None)
# Use the peer address from the pairing protocol or the connection
if self.peer_bd_addr:
peer_address = self.peer_bd_addr
else:
peer_address = self.connection.peer_address
# Create an object to hold the keys
keys = PairingKeys()
keys.address_type = peer_address.address_type
authenticated = self.pairing_method != self.JUST_WORKS
if self.sc:
keys.ltk = PairingKeys.Key(
value = self.ltk,
authenticated = authenticated
)
else:
our_ltk_key = PairingKeys.Key(
value = self.ltk,
authenticated = authenticated,
ediv = self.ltk_ediv,
rand = self.ltk_rand
)
peer_ltk_key = PairingKeys.Key(
value = self.peer_ltk,
authenticated = authenticated,
ediv = self.peer_ediv,
rand = self.peer_rand
)
if self.is_initiator:
keys.ltk_central = peer_ltk_key
keys.ltk_peripheral = our_ltk_key
else:
keys.ltk_central = our_ltk_key
keys.ltk_peripheral = peer_ltk_key
if self.peer_identity_resolving_key is not None:
keys.irk = PairingKeys.Key(
value = self.peer_identity_resolving_key,
authenticated = authenticated
)
if self.peer_signature_key is not None:
keys.csrk = PairingKeys.Key(
value = self.peer_signature_key,
authenticated = authenticated
)
if self.link_key is not None:
keys.link_key = PairingKeys.Key(
value = self.link_key,
authenticated = authenticated
)
self.manager.on_pairing(self, peer_address, keys)
def on_pairing_failure(self, reason):
logger.warn(f'pairing failure ({error_name(reason)})')
if self.completed:
return
else:
self.completed = True
error = ProtocolError(reason, 'smp', error_name(reason))
if self.pairing_result is not None and not self.pairing_result.done():
self.pairing_result.set_exception(error)
self.manager.on_pairing_failure(self, reason)
def on_smp_command(self, command):
# Find the handler method
handler_name = f'on_{command.name.lower()}'
handler = getattr(self, handler_name, None)
if handler is not None:
try:
handler(command)
except Exception as error:
logger.warning(f'{color("!!! Exception in handler:", "red")} {error}')
response = SMP_Pairing_Failed_Command(reason = SMP_UNSPECIFIED_REASON_ERROR)
self.send_command(response)
else:
logger.error(color('SMP command not handled???', 'red'))
def on_smp_pairing_request_command(self, command):
asyncio.create_task(self.on_smp_pairing_request_command_async(command))
async def on_smp_pairing_request_command_async(self, command):
# Check if the request should proceed
accepted = await self.pairing_config.delegate.accept()
if not accepted:
logger.debug('pairing rejected by delegate')
self.send_pairing_failed(SMP_PAIRING_NOT_SUPPORTED_ERROR)
return
# Save the request
self.preq = bytes(command)
# Bonding and SC require both sides to request/support it
self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0)
self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0)
self.ct2 = self.ct2 and (command.auth_req & SMP_CT2_AUTHREQ != 0)
# Check for OOB
if command.oob_data_flag != 0:
self.terminate(SMP_OOB_NOT_AVAILABLE_ERROR)
return
# Decide which pairing method to use
self.decide_pairing_method(
command.auth_req,
command.io_capability,
self.io_capability
)
logger.debug(f'pairing method: {self.PAIRING_METHOD_NAMES[self.pairing_method]}')
# Key distribution
self.initiator_key_distribution, self.responder_key_distribution = await self.pairing_config.delegate.key_distribution_response(
command.initiator_key_distribution, command.responder_key_distribution)
self.compute_peer_expected_distributions(self.initiator_key_distribution)
# The pairing is now starting
self.manager.on_session_start(self)
# Display a passkey if we need to
if not self.sc:
if self.pairing_method == self.PASSKEY and self.passkey_display:
self.display_passkey()
# Respond
self.send_pairing_response_command()
def on_smp_pairing_response_command(self, command):
if self.is_responder:
logger.warn(color('received pairing response as a responder', 'red'))
return
# Save the response
self.pres = bytes(command)
self.peer_io_capability = command.io_capability
# Bonding and SC require both sides to request/support it
self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0)
self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0)
# Check for OOB
if self.sc and command.oob_data_flag:
self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR)
return
# Decide which pairing method to use
self.decide_pairing_method(
command.auth_req,
self.io_capability,
command.io_capability
)
logger.debug(f'pairing method: {self.PAIRING_METHOD_NAMES[self.pairing_method]}')
# Key distribution
if (command.initiator_key_distribution & ~self.initiator_key_distribution != 0) or \
(command.responder_key_distribution & ~self.responder_key_distribution != 0):
# The response isn't a subset of the request
self.send_pairing_failed(SMP_INVALID_PARAMETERS_ERROR)
return
self.initiator_key_distribution = command.initiator_key_distribution
self.responder_key_distribution = command.responder_key_distribution
self.compute_peer_expected_distributions(self.responder_key_distribution)
# Start phase 2
if self.sc:
if self.pairing_method == self.PASSKEY and self.passkey_display:
self.display_passkey()
self.send_public_key_command()
else:
if self.pairing_method == self.PASSKEY:
self.display_or_input_passkey(self.send_pairing_confirm_command)
else:
self.send_pairing_confirm_command()
def on_smp_pairing_confirm_command_legacy(self, command):
if self.is_initiator:
self.send_pairing_random_command()
else:
# If the method is PASSKEY, now is the time to input the code
if self.pairing_method == self.PASSKEY and not self.passkey_display:
self.input_passkey(self.send_pairing_confirm_command)
else:
self.send_pairing_confirm_command()
def on_smp_pairing_confirm_command_secure_connections(self, command):
if self.pairing_method == self.JUST_WORKS or self.pairing_method == self.NUMERIC_COMPARISON:
if self.is_initiator:
self.r = crypto.r()
self.send_pairing_random_command()
elif self.pairing_method == self.PASSKEY:
if self.is_initiator:
self.send_pairing_random_command()
else:
self.send_pairing_confirm_command()
def on_smp_pairing_confirm_command(self, command):
self.confirm_value = command.confirm_value
if self.sc:
self.on_smp_pairing_confirm_command_secure_connections(command)
else:
self.on_smp_pairing_confirm_command_legacy(command)
def on_smp_pairing_random_command_legacy(self, command):
# Check that the confirmation values match
confirm_verifier = crypto.c1(
self.tk,
command.random_value,
self.preq,
self.pres,
self.iat,
self.rat,
self.ia,
self.ra
)
if not self.check_expected_value(
self.confirm_value,
confirm_verifier,
SMP_CONFIRM_VALUE_FAILED_ERROR
):
return
# Compute STK
if self.is_initiator:
mrand = self.r
srand = command.random_value
else:
srand = self.r
mrand = command.random_value
stk = crypto.s1(self.tk, srand, mrand)
logger.debug(f'STK = {stk.hex()}')
# Generate LTK
self.ltk = crypto.r()
if self.is_initiator:
self.start_encryption(stk)
else:
self.send_pairing_random_command()
def on_smp_pairing_random_command_secure_connections(self, command):
if self.is_initiator:
if self.pairing_method == self.JUST_WORKS or self.pairing_method == self.NUMERIC_COMPARISON:
# Check that the random value matches what was committed to earlier
confirm_verifier = crypto.f4(
self.pkb,
self.pka,
command.random_value,
bytes([0])
)
if not self.check_expected_value(
self.confirm_value,
confirm_verifier,
SMP_CONFIRM_VALUE_FAILED_ERROR
):
return
elif self.pairing_method == self.PASSKEY:
# Check that the random value matches what was committed to earlier
confirm_verifier = crypto.f4(
self.pkb,
self.pka,
command.random_value,
bytes([0x80 + ((self.passkey >> self.passkey_step) & 1)])
)
if not self.check_expected_value(
self.confirm_value,
confirm_verifier,
SMP_CONFIRM_VALUE_FAILED_ERROR
):
return
# Move on to the next iteration
self.passkey_step += 1
logger.debug(f'passkey finished step {self.passkey_step} of 20')
if self.passkey_step < 20:
self.send_pairing_confirm_command()
return
else:
return
else:
if self.pairing_method == self.JUST_WORKS or self.pairing_method == self.NUMERIC_COMPARISON:
self.send_pairing_random_command()
elif self.pairing_method == self.PASSKEY:
# Check that the random value matches what was committed to earlier
confirm_verifier = crypto.f4(
self.pka,
self.pkb,
command.random_value,
bytes([0x80 + ((self.passkey >> self.passkey_step) & 1)])
)
if not self.check_expected_value(
self.confirm_value,
confirm_verifier,
SMP_CONFIRM_VALUE_FAILED_ERROR
):
return
self.send_pairing_random_command()
# Move on to the next iteration
self.passkey_step += 1
logger.debug(f'passkey finished step {self.passkey_step} of 20')
if self.passkey_step < 20:
self.r = crypto.r()
return
else:
return
# Compute the MacKey and LTK
a = self.ia + bytes([self.iat])
b = self.ra + bytes([self.rat])
(mac_key, self.ltk) = crypto.f5(self.dh_key, self.na, self.nb, a, b)
# Compute the DH Key checks
if self.pairing_method == self.JUST_WORKS or self.pairing_method == self.NUMERIC_COMPARISON:
ra = bytes(16)
rb = ra
elif self.pairing_method == self.PASSKEY:
ra = self.passkey.to_bytes(16, byteorder='little')
rb = ra
else:
# OOB not implemented yet
return
io_cap_a = self.preq[1:4]
io_cap_b = self.pres[1:4]
self.ea = crypto.f6(mac_key, self.na, self.nb, rb, io_cap_a, a, b)
self.eb = crypto.f6(mac_key, self.nb, self.na, ra, io_cap_b, b, a)
# Next steps to be performed after possible user confirmation
def next_steps():
# The initiator sends the DH Key check to the responder
if self.is_initiator:
self.send_pairing_dhkey_check_command()
else:
if self.wait_before_continuing:
self.wait_before_continuing.set_result(None)
# Prompt the user for confirmation if needed
if self.pairing_method == self.JUST_WORKS or self.pairing_method == self.NUMERIC_COMPARISON:
# Compute the 6-digit code
code = crypto.g2(self.pka, self.pkb, self.na, self.nb) % 1000000
if self.pairing_method == self.NUMERIC_COMPARISON:
# Ask for user confirmation
self.wait_before_continuing = asyncio.get_running_loop().create_future()
self.prompt_user_for_numeric_comparison(code, next_steps)
else:
next_steps()
else:
next_steps()
def on_smp_pairing_random_command(self, command):
self.peer_random_value = command.random_value
if self.sc:
self.on_smp_pairing_random_command_secure_connections(command)
else:
self.on_smp_pairing_random_command_legacy(command)
def on_smp_pairing_public_key_command(self, command):
# Store the public key so that we can compute the confirmation value later
self.peer_public_key_x = command.public_key_x
self.peer_public_key_y = command.public_key_y
# Compute the DH key
self.dh_key = bytes(reversed(self.manager.ecc_key.dh(
bytes(reversed(command.public_key_x)),
bytes(reversed(command.public_key_y))
)))
logger.debug(f'DH key: {self.dh_key.hex()}')
if self.is_initiator:
if self.pairing_method == self.PASSKEY:
if self.passkey_display:
self.send_pairing_confirm_command()
else:
self.input_passkey(self.send_pairing_confirm_command)
else:
# Send our public key back to the initiator
if self.pairing_method == self.PASSKEY:
self.display_or_input_passkey(self.send_public_key_command)
else:
self.send_public_key_command()
if self.pairing_method == self.JUST_WORKS or self.pairing_method == self.NUMERIC_COMPARISON:
# We can now send the confirmation value
self.send_pairing_confirm_command()
def on_smp_pairing_dhkey_check_command(self, command):
# Check that what we received matches what we computed earlier
expected = self.eb if self.is_initiator else self.ea
if not self.check_expected_value(
expected,
command.dhkey_check,
SMP_DHKEY_CHECK_FAILED_ERROR
):
return
if self.is_responder:
if self.wait_before_continuing is not None:
async def next_steps():
await self.wait_before_continuing
self.wait_before_continuing = None
self.send_pairing_dhkey_check_command()
asyncio.create_task(next_steps())
else:
self.send_pairing_dhkey_check_command()
else:
self.start_encryption(self.ltk)
def on_smp_pairing_failed_command(self, command):
self.on_pairing_failure(command.reason)
def on_smp_encryption_information_command(self, command):
self.peer_ltk = command.long_term_key
self.check_key_distribution(SMP_Encryption_Information_Command)
def on_smp_master_identification_command(self, command):
self.peer_ediv = command.ediv
self.peer_rand = command.rand
self.check_key_distribution(SMP_Master_Identification_Command)
def on_smp_identity_information_command(self, command):
self.peer_identity_resolving_key = command.identity_resolving_key
self.check_key_distribution(SMP_Identity_Information_Command)
def on_smp_identity_address_information_command(self, command):
self.peer_bd_addr = command.bd_addr
self.check_key_distribution(SMP_Identity_Address_Information_Command)
def on_smp_signing_information_command(self, command):
self.peer_signature_key = command.signature_key
self.check_key_distribution(SMP_Signing_Information_Command)
# -----------------------------------------------------------------------------
class Manager(EventEmitter):
'''
Implements the Initiator and Responder roles of the Security Manager Protocol
'''
def __init__(self, device, address):
super().__init__()
self.device = device
self.address = address
self.sessions = {}
self._ecc_key = None
self.pairing_config_factory = lambda connection: PairingConfig()
def send_command(self, connection, command):
logger.debug(f'>>> Sending SMP Command on connection [0x{connection.handle:04X}] {connection.peer_address}: {command}')
connection.send_l2cap_pdu(SMP_CID, command.to_bytes())
def on_smp_pdu(self, connection, pdu):
# Look for a session with this connection, and create one if none exists
if not (session := self.sessions.get(connection.handle)):
pairing_config = self.pairing_config_factory(connection)
if pairing_config is None:
# Pairing disabled
self.send_command(
connection,
SMP_Pairing_Failed_Command(
reason = SMP_PAIRING_NOT_SUPPORTED_ERROR
)
)
return
session = Session(self, connection, pairing_config)
self.sessions[connection.handle] = session
# Parse the L2CAP payload into an SMP Command object
command = SMP_Command.from_bytes(pdu)
logger.debug(f'<<< Received SMP Command on connection [0x{connection.handle:04X}] {connection.peer_address}: {command}')
# Delegate the handling of the command to the session
session.on_smp_command(command)
@property
def ecc_key(self):
if self._ecc_key is None:
self._ecc_key = crypto.EccKey.generate()
return self._ecc_key
async def pair(self, connection):
# TODO: check if there's already a session for this connection
pairing_config = self.pairing_config_factory(connection)
if pairing_config is None:
raise ValueError('pairing config must not be None when initiating')
session = Session(self, connection, pairing_config)
self.sessions[connection.handle] = session
return await session.pair()
def request_pairing(self, connection):
pairing_config = self.pairing_config_factory(connection)
if pairing_config:
auth_req = smp_auth_req(
pairing_config.bonding,
pairing_config.mitm,
pairing_config.sc,
False,
False
)
else:
auth_req = 0
self.send_command(connection, SMP_Security_Request_Command(auth_req=auth_req))
def on_session_start(self, session):
self.device.on_pairing_start(session.connection.handle)
def on_pairing(self, session, identity_address, keys):
# Store the keys in the key store
if self.device.keystore and identity_address is not None:
async def store_keys():
try:
await self.device.keystore.update(str(identity_address), keys)
except Exception as error:
logger.warn(f'!!! error while storing keys: {error}')
asyncio.create_task(store_keys())
# Notify the device
self.device.on_pairing(session.connection.handle, keys)
def on_pairing_failure(self, session, reason):
self.device.on_pairing_failure(session.connection.handle, reason)
def on_session_end(self, session):
logger.debug(f'session end for connection 0x{session.connection.handle:04X}')
if session.connection.handle in self.sessions:
del self.sessions[session.connection.handle]
def get_long_term_key(self, connection, rand, ediv):
if session := self.sessions.get(connection.handle):
return session.get_long_term_key(rand, ediv)