blob: 7916a924c1eca067378aba3a1da6308180038fef [file] [log] [blame]
#/usr/bin/env python3.4
#
# Copyright (C) 2016 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import time
import os
from acts.keys import Config
from acts.utils import rand_ascii_str
from acts.test_utils.bt.bt_constants import gatt_cb_strings
from acts.test_utils.bt.bt_constants import gatt_characteristic
from acts.test_utils.bt.bt_constants import gatt_characteristic_value_format
from acts.test_utils.bt.bt_constants import gatt_cb_err
from acts.test_utils.bt.bt_constants import gatt_transport
from acts.test_utils.bt.bt_constants import gatt_event
from acts.test_utils.bt.bt_constants import gatt_server_responses
from acts.test_utils.bt.bt_constants import gatt_service_types
from acts.test_utils.bt.bt_constants import small_timeout
from acts.test_utils.bt.gatt_test_database import STRING_512BYTES
from acts.utils import exe_cmd
from math import ceil
class GattServerLib():
characteristic_list = []
default_timeout = 10
descriptor_list = []
dut = None
gatt_server = None
gatt_server_callback = None
gatt_server_list = []
log = None
service_list = []
write_mapping = {}
def __init__(self, log, dut):
self.dut = dut
self.log = log
def list_all_uuids(self):
"""From the GATT Client, discover services and list all services,
chars and descriptors.
"""
self.log.info("Service List:")
for service in self.dut.droid.gattGetServiceUuidList(self.gatt_server):
self.dut.log.info("GATT Server service uuid: {}".format(service))
self.log.info("Characteristics List:")
for characteristic in self.characteristic_list:
instance_id = self.dut.droid.gattServerGetCharacteristicInstanceId(
characteristic)
uuid = self.dut.droid.gattServerGetCharacteristicUuid(
characteristic)
self.dut.log.info(
"GATT Server characteristic handle uuid: {} {}".format(
hex(instance_id), uuid))
# TODO: add getting insance ids and uuids from each descriptor.
def open(self):
"""Open an empty GATT Server instance"""
self.gatt_server_callback = self.dut.droid.gattServerCreateGattServerCallback(
)
self.gatt_server = self.dut.droid.gattServerOpenGattServer(
self.gatt_server_callback)
self.gatt_server_list.append(self.gatt_server)
def clear_services(self):
"""Clear BluetoothGattServices from BluetoothGattServer"""
self.dut.droid.gattServerClearServices(self.gatt_server)
def close_bluetooth_gatt_servers(self):
"""Close Bluetooth Gatt Servers"""
try:
for btgs in self.gatt_server_list:
self.dut.droid.gattServerClose(btgs)
except Exception as err:
self.log.error(
"Failed to close Bluetooth GATT Servers: {}".format(err))
self.characteristic_list = []
self.descriptor_list = []
self.gatt_server_list = []
self.service_list = []
def characteristic_set_value_by_instance_id(self, instance_id, value):
"""Set Characteristic value by instance id"""
self.dut.droid.gattServerCharacteristicSetValueByInstanceId(
int(instance_id, 16), value)
def notify_characteristic_changed(self, instance_id, confirm):
""" Notify characteristic changed """
self.dut.droid.gattServerNotifyCharacteristicChangedByInstanceId(
self.gatt_server, 0, int(instance_id, 16), confirm)
def send_response(self, user_input):
"""Send a single response to the GATT Client"""
args = user_input.split()
mtu = 23
if len(args) == 2:
user_input = args[0]
mtu = int(args[1])
desc_read = gatt_event['desc_read_req']['evt'].format(
self.gatt_server_callback)
desc_write = gatt_event['desc_write_req']['evt'].format(
self.gatt_server_callback)
char_read = gatt_event['char_read_req']['evt'].format(
self.gatt_server_callback)
char_write = gatt_event['char_write']['evt'].format(
self.gatt_server_callback)
execute_write = gatt_event['exec_write']['evt'].format(
self.gatt_server_callback)
regex = "({}|{}|{}|{}|{})".format(desc_read, desc_write, char_read,
char_write, execute_write)
events = self.dut.ed.pop_events(regex, 5, small_timeout)
status = 0
if user_input:
status = gatt_server_responses.get(user_input)
for event in events:
self.log.debug("Found event: {}.".format(event))
request_id = event['data']['requestId']
if event['name'] == execute_write:
if ('execute' in event['data']
and event['data']['execute'] == True):
for key in self.write_mapping:
value = self.write_mapping[key]
self.log.info("Writing key, value: {}, {}".format(
key, value))
self.dut.droid.gattServerSetByteArrayValueByInstanceId(
key, value)
else:
self.log.info("Execute result is false")
self.write_mapping = {}
self.dut.droid.gattServerSendResponse(
self.gatt_server, 0, request_id, status, 0, [])
continue
offset = event['data']['offset']
instance_id = event['data']['instanceId']
if (event['name'] == desc_write or event['name'] == char_write):
if ('preparedWrite' in event['data']
and event['data']['preparedWrite'] == True):
value = event['data']['value']
if instance_id in self.write_mapping.keys():
self.write_mapping[
instance_id] = self.write_mapping[instance_id] + value
self.log.info(
"New Prepared Write Value for {}: {}".format(
instance_id, self.write_mapping[instance_id]))
else:
self.log.info("write mapping key, value {}, {}".format(
instance_id, value))
self.write_mapping[instance_id] = value
self.log.info("current value {}, {}".format(
instance_id, value))
self.dut.droid.gattServerSendResponse(
self.gatt_server, 0, request_id, status, 0, value)
continue
else:
self.dut.droid.gattServerSetByteArrayValueByInstanceId(
event['data']['instanceId'], event['data']['value'])
try:
data = self.dut.droid.gattServerGetReadValueByInstanceId(
int(event['data']['instanceId']))
except Exception as err:
self.log.error(err)
if not data:
data = [1]
self.log.info(
"GATT Server Send Response [request_id, status, offset, data]" \
" [{}, {}, {}, {}]".
format(request_id, status, offset, data))
data = data[offset:offset + mtu - 1]
self.dut.droid.gattServerSendResponse(
self.gatt_server, 0, request_id, status, offset, data)
def _setup_service(self, serv):
service = self.dut.droid.gattServerCreateService(
serv['uuid'], serv['type'])
if 'handles' in serv:
self.dut.droid.gattServerServiceSetHandlesToReserve(
service, serv['handles'])
return service
def _setup_characteristic(self, char):
characteristic = \
self.dut.droid.gattServerCreateBluetoothGattCharacteristic(
char['uuid'], char['properties'], char['permissions'])
if 'instance_id' in char:
self.dut.droid.gattServerCharacteristicSetInstanceId(
characteristic, char['instance_id'])
set_id = self.dut.droid.gattServerCharacteristicGetInstanceId(
characteristic)
if set_id != char['instance_id']:
self.log.error(
"Instance ID did not match up. Found {} Expected {}".
format(set_id, char['instance_id']))
if 'value_type' in char:
value_type = char['value_type']
value = char['value']
if value_type == gatt_characteristic_value_format['string']:
self.log.info("Set String value result: {}".format(
self.dut.droid.gattServerCharacteristicSetStringValue(
characteristic, value)))
elif value_type == gatt_characteristic_value_format['byte']:
self.log.info("Set Byte Array value result: {}".format(
self.dut.droid.gattServerCharacteristicSetByteValue(
characteristic, value)))
else:
self.log.info("Set Int value result: {}".format(
self.dut.droid.gattServerCharacteristicSetIntValue(
characteristic, value, value_type, char['offset'])))
return characteristic
def _setup_descriptor(self, desc):
descriptor = self.dut.droid.gattServerCreateBluetoothGattDescriptor(
desc['uuid'], desc['permissions'])
if 'value' in desc:
self.dut.droid.gattServerDescriptorSetByteValue(
descriptor, desc['value'])
if 'instance_id' in desc:
self.dut.droid.gattServerDescriptorSetInstanceId(
descriptor, desc['instance_id'])
self.descriptor_list.append(descriptor)
return descriptor
def setup_gatts_db(self, database):
"""Setup GATT Server database"""
self.gatt_server_callback = \
self.dut.droid.gattServerCreateGattServerCallback()
self.gatt_server = self.dut.droid.gattServerOpenGattServer(
self.gatt_server_callback)
self.gatt_server_list.append(self.gatt_server)
for serv in database['services']:
service = self._setup_service(serv)
self.service_list.append(service)
if 'characteristics' in serv:
for char in serv['characteristics']:
characteristic = self._setup_characteristic(char)
if 'descriptors' in char:
for desc in char['descriptors']:
descriptor = self._setup_descriptor(desc)
self.dut.droid.gattServerCharacteristicAddDescriptor(
characteristic, descriptor)
self.characteristic_list.append(characteristic)
self.dut.droid.gattServerAddCharacteristicToService(
service, characteristic)
self.dut.droid.gattServerAddService(self.gatt_server, service)
expected_event = gatt_cb_strings['serv_added'].format(
self.gatt_server_callback)
self.dut.ed.pop_event(expected_event, 10)
return self.gatt_server, self.gatt_server_callback
def send_continuous_response(self, user_input):
"""Send the same response"""
desc_read = gatt_event['desc_read_req']['evt'].format(
self.gatt_server_callback)
desc_write = gatt_event['desc_write_req']['evt'].format(
self.gatt_server_callback)
char_read = gatt_event['char_read_req']['evt'].format(
self.gatt_server_callback)
char_write = gatt_event['char_write']['evt'].format(
self.gatt_server_callback)
execute_write = gatt_event['exec_write']['evt'].format(
self.gatt_server_callback)
regex = "({}|{}|{}|{}|{})".format(desc_read, desc_write, char_read,
char_write, execute_write)
offset = 0
status = 0
mtu = 23
char_value = []
for i in range(512):
char_value.append(i % 256)
len_min = 470
end_time = time.time() + 180
i = 0
num_packets = ceil((len(char_value) + 1) / (mtu - 1))
while time.time() < end_time:
events = self.dut.ed.pop_events(regex, 10, small_timeout)
for event in events:
start_offset = i * (mtu - 1)
i += 1
self.log.debug("Found event: {}.".format(event))
request_id = event['data']['requestId']
data = char_value[start_offset:start_offset + mtu - 1]
if not data:
data = [1]
self.log.debug(
"GATT Server Send Response [request_id, status, offset, " \
"data] [{}, {}, {}, {}]".format(request_id, status, offset,
data))
self.dut.droid.gattServerSendResponse(
self.gatt_server, 0, request_id, status, offset, data)
def send_continuous_response_data(self, user_input):
"""Send the same response with data"""
desc_read = gatt_event['desc_read_req']['evt'].format(
self.gatt_server_callback)
desc_write = gatt_event['desc_write_req']['evt'].format(
self.gatt_server_callback)
char_read = gatt_event['char_read_req']['evt'].format(
self.gatt_server_callback)
char_write = gatt_event['char_write']['evt'].format(
self.gatt_server_callback)
execute_write = gatt_event['exec_write']['evt'].format(
self.gatt_server_callback)
regex = "({}|{}|{}|{}|{})".format(desc_read, desc_write, char_read,
char_write, execute_write)
offset = 0
status = 0
mtu = 11
char_value = []
len_min = 470
end_time = time.time() + 180
i = 0
num_packets = ceil((len(char_value) + 1) / (mtu - 1))
while time.time() < end_time:
events = self.dut.ed.pop_events(regex, 10, small_timeout)
for event in events:
self.log.info(event)
request_id = event['data']['requestId']
if event['name'] == execute_write:
if ('execute' in event['data']
and event['data']['execute'] == True):
for key in self.write_mapping:
value = self.write_mapping[key]
self.log.debug("Writing key, value: {}, {}".format(
key, value))
self.dut.droid.gattServerSetByteArrayValueByInstanceId(
key, value)
self.write_mapping = {}
self.dut.droid.gattServerSendResponse(
self.gatt_server, 0, request_id, status, 0, [1])
continue
offset = event['data']['offset']
instance_id = event['data']['instanceId']
if (event['name'] == desc_write
or event['name'] == char_write):
if ('preparedWrite' in event['data']
and event['data']['preparedWrite'] == True):
value = event['data']['value']
if instance_id in self.write_mapping:
self.write_mapping[
instance_id] = self.write_mapping[instance_id] + value
else:
self.write_mapping[instance_id] = value
else:
self.dut.droid.gattServerSetByteArrayValueByInstanceId(
event['data']['instanceId'],
event['data']['value'])
try:
data = self.dut.droid.gattServerGetReadValueByInstanceId(
int(event['data']['instanceId']))
except Exception as err:
self.log.error(err)
if not data:
self.dut.droid.gattServerSendResponse(
self.gatt_server, 0, request_id, status, offset, [1])
else:
self.dut.droid.gattServerSendResponse(
self.gatt_server, 0, request_id, status, offset,
data[offset:offset + 17])