blob: f51cfe6553cd12f5a29e0e228d1cb0f4c42434d0 [file] [log] [blame]
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Keys and Key Storage
#
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import logging
import os
import json
from colors import color
from .hci import Address
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
class PairingKeys:
class Key:
def __init__(self, value, authenticated=False, ediv=None, rand=None):
self.value = value
self.authenticated = authenticated
self.ediv = ediv
self.rand = rand
@classmethod
def from_dict(cls, key_dict):
value = bytes.fromhex(key_dict['value'])
authenticated = key_dict.get('authenticated', False)
ediv = key_dict.get('ediv')
rand = key_dict.get('rand')
if rand is not None:
rand = bytes.fromhex(rand)
return cls(value, authenticated, ediv, rand)
def to_dict(self):
key_dict = {'value': self.value.hex(), 'authenticated': self.authenticated}
if self.ediv is not None:
key_dict['ediv'] = self.ediv
if self.rand is not None:
key_dict['rand'] = self.rand.hex()
return key_dict
def __init__(self):
self.address_type = None
self.ltk = None
self.ltk_central = None
self.ltk_peripheral = None
self.irk = None
self.csrk = None
self.link_key = None # Classic
@staticmethod
def key_from_dict(keys_dict, key_name):
key_dict = keys_dict.get(key_name)
if key_dict is not None:
return PairingKeys.Key.from_dict(key_dict)
@staticmethod
def from_dict(keys_dict):
keys = PairingKeys()
keys.address_type = keys_dict.get('address_type')
keys.ltk = PairingKeys.key_from_dict(keys_dict, 'ltk')
keys.ltk_central = PairingKeys.key_from_dict(keys_dict, 'ltk_central')
keys.ltk_peripheral = PairingKeys.key_from_dict(keys_dict, 'ltk_peripheral')
keys.irk = PairingKeys.key_from_dict(keys_dict, 'irk')
keys.csrk = PairingKeys.key_from_dict(keys_dict, 'csrk')
keys.link_key = PairingKeys.key_from_dict(keys_dict, 'link_key')
return keys
def to_dict(self):
keys = {}
if self.address_type is not None:
keys['address_type'] = self.address_type
if self.ltk is not None:
keys['ltk'] = self.ltk.to_dict()
if self.ltk_central is not None:
keys['ltk_central'] = self.ltk_central.to_dict()
if self.ltk_peripheral is not None:
keys['ltk_peripheral'] = self.ltk_peripheral.to_dict()
if self.irk is not None:
keys['irk'] = self.irk.to_dict()
if self.csrk is not None:
keys['csrk'] = self.csrk.to_dict()
if self.link_key is not None:
keys['link_key'] = self.link_key.to_dict()
return keys
def print(self, prefix=''):
keys_dict = self.to_dict()
for (property, value) in keys_dict.items():
if type(value) is dict:
print(f'{prefix}{color(property, "cyan")}:')
for (key_property, key_value) in value.items():
print(f'{prefix} {color(key_property, "green")}: {key_value}')
else:
print(f'{prefix}{color(property, "cyan")}: {value}')
# -----------------------------------------------------------------------------
class KeyStore:
async def delete(self, name):
pass
async def update(self, name, keys):
pass
async def get(self, name):
return PairingKeys()
async def get_all(self):
return []
async def get_resolving_keys(self):
all_keys = await self.get_all()
resolving_keys = []
for (name, keys) in all_keys:
if keys.irk is not None:
if keys.address_type is None:
address_type = Address.RANDOM_DEVICE_ADDRESS
else:
address_type = keys.address_type
resolving_keys.append((keys.irk.value, Address(name, address_type)))
return resolving_keys
async def print(self, prefix=''):
entries = await self.get_all()
separator = ''
for (name, keys) in entries:
print(separator + prefix + color(name, 'yellow'))
keys.print(prefix = prefix + ' ')
separator = '\n'
@staticmethod
def create_for_device(device_config):
if device_config.keystore is None:
return None
keystore_type = device_config.keystore.split(':', 1)[0]
if keystore_type == 'JsonKeyStore':
return JsonKeyStore.from_device_config(device_config)
return None
# -----------------------------------------------------------------------------
class JsonKeyStore(KeyStore):
APP_NAME = 'Bumble'
APP_AUTHOR = 'Google'
KEYS_DIR = 'Pairing'
DEFAULT_NAMESPACE = '__DEFAULT__'
def __init__(self, namespace, filename=None):
self.namespace = namespace if namespace is not None else self.DEFAULT_NAMESPACE
if filename is None:
# Use a default for the current user
import appdirs
self.directory_name = os.path.join(
appdirs.user_data_dir(self.APP_NAME, self.APP_AUTHOR),
self.KEYS_DIR
)
json_filename = f'{self.namespace}.json'.lower().replace(':', '-')
self.filename = os.path.join(self.directory_name, json_filename)
else:
self.filename = filename
self.directory_name = os.path.dirname(os.path.abspath(self.filename))
logger.debug(f'JSON keystore: {self.filename}')
@staticmethod
def from_device_config(device_config):
params = device_config.keystore.split(':', 1)[1:]
namespace = str(device_config.address)
if params:
filename = params[1]
else:
filename = None
return JsonKeyStore(namespace, filename)
async def load(self):
try:
with open(self.filename, 'r') as json_file:
return json.load(json_file)
except FileNotFoundError:
return {}
async def save(self, db):
# Create the directory if it doesn't exist
if not os.path.exists(self.directory_name):
os.makedirs(self.directory_name, exist_ok=True)
# Save to a temporary file
temp_filename = self.filename + '.tmp'
with open(temp_filename, 'w') as output:
json.dump(db, output, sort_keys=True, indent=4)
# Atomically replace the previous file
os.rename(temp_filename, self.filename)
async def delete(self, name):
db = await self.load()
namespace = db.get(self.namespace)
if namespace is None:
raise KeyError(name)
del namespace[name]
await self.save(db)
async def update(self, name, keys):
db = await self.load()
namespace = db.setdefault(self.namespace, {})
namespace[name] = keys.to_dict()
await self.save(db)
async def get_all(self):
db = await self.load()
namespace = db.get(self.namespace)
if namespace is None:
return []
return [(name, PairingKeys.from_dict(keys)) for (name, keys) in namespace.items()]
async def get(self, name):
db = await self.load()
namespace = db.get(self.namespace)
if namespace is None:
return None
keys = namespace.get(name)
if keys is None:
return None
return PairingKeys.from_dict(keys)