bluetooth_device: add methods for pairing
This patch reworks on bluetooth device control objects on
the server side as well as the client side to support pairing.
BUG=chromium:610238
TEST=None
Change-Id: I257bcfb9285d97a166815e469c35b84187e94c06
Reviewed-on: https://chromium-review.googlesource.com/349899
Commit-Ready: Shyh-In Hwang <josephsih@chromium.org>
Tested-by: Shyh-In Hwang <josephsih@chromium.org>
Reviewed-by: Wai-Hong Tam <waihong@google.com>
diff --git a/client/cros/bluetooth/bluetooth_device_xmlrpc_server.py b/client/cros/bluetooth/bluetooth_device_xmlrpc_server.py
index be58d79..7d43d56 100755
--- a/client/cros/bluetooth/bluetooth_device_xmlrpc_server.py
+++ b/client/cros/bluetooth/bluetooth_device_xmlrpc_server.py
@@ -22,19 +22,25 @@
from autotest_lib.client.cros import xmlrpc_server
-class _PinAgent(dbus.service.Object):
- """The agent handling bluetooth device with a known pin code.
+class PairingAgent(dbus.service.Object):
+ """The agent handling the authentication process of bluetooth pairing.
- _PinAgent overrides RequestPinCode method to return a given pin code.
- User can use this agent to pair bluetooth device which has a known pin code.
+ PairingAgent overrides RequestPinCode method to return a given pin code.
+ User can use this agent to pair bluetooth device which has a known
+ pin code.
+
+ TODO (josephsih): more pairing modes other than pin code would be
+ supported later.
"""
+
def __init__(self, pin, *args, **kwargs):
- super(_PinAgent, self).__init__(*args, **kwargs)
+ super(PairingAgent, self).__init__(*args, **kwargs)
self._pin = pin
- @dbus.service.method('org.bluez.Agent1', in_signature="o", out_signature="s")
+ @dbus.service.method('org.bluez.Agent1',
+ in_signature='o', out_signature='s')
def RequestPinCode(self, device_path):
"""Requests pin code for a device.
@@ -45,7 +51,7 @@
@returns: The known pin code.
"""
- logging.info('RequestPinCode for %s, return %s', device_path, self._pin)
+ logging.info('RequestPinCode for %s; return %s', device_path, self._pin)
return self._pin
@@ -82,6 +88,8 @@
BLUEZ_PROFILE_MANAGER_PATH = '/org/bluez'
BLUEZ_PROFILE_MANAGER_IFACE = 'org.bluez.ProfileManager1'
BLUEZ_ERROR_ALREADY_EXISTS = 'org.bluez.Error.AlreadyExists'
+ DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
+ AGENT_PATH = '/test/agent'
BLUETOOTH_LIBDIR = '/var/lib/bluetooth'
@@ -129,7 +137,9 @@
# The agent to handle pin code request, which will be
# created when user calls pair_legacy_device method.
- self._pin_agent = None
+ self._pairing_agent = None
+ # The default capability of the agent.
+ self._capability = 'KeyboardDisplay'
@xmlrpc_server.dbus_safe(False)
@@ -560,6 +570,32 @@
@xmlrpc_server.dbus_safe(False)
+ def get_device_by_address(self, address):
+ """Read information about the remote device with the specified address.
+
+ @param address: Address of the device to get.
+
+ @return the properties of the device as a JSON-encoded dictionary
+ on success, the value False otherwise.
+
+ """
+ objects = self._bluez.GetManagedObjects(
+ dbus_interface=self.BLUEZ_MANAGER_IFACE, byte_arrays=True)
+ devices = []
+ for path, ifaces in objects.iteritems():
+ if self.BLUEZ_DEVICE_IFACE in ifaces:
+ device = objects[path][self.BLUEZ_DEVICE_IFACE]
+ if device.get('Address') == address:
+ return json.dumps(device)
+
+ devices = json.loads(self.get_devices())
+ for device in devices:
+ if device.get['Address'] == address:
+ return json.dumps(device)
+ return json.dumps(dict())
+
+
+ @xmlrpc_server.dbus_safe(False)
def start_discovery(self):
"""Start discovery of remote devices.
@@ -666,33 +702,33 @@
@xmlrpc_server.dbus_safe(False)
- def _setup_pin_agent(self, pin):
- """Initializes a _PinAgent and registers it to handle pin code request.
+ def _setup_pairing_agent(self, pin):
+ """Initializes and resiters a PairingAgent to handle authenticaiton.
@param pin: The pin code this agent will answer.
"""
- agent_path = '/test/agent'
- if self._pin_agent:
+ if self._pairing_agent:
logging.info('Removing the old agent before initializing a new one')
- self._pin_agent.remove_from_connection()
- self._pin_agent = None
- self._pin_agent = _PinAgent(pin, self._system_bus, agent_path)
+ self._pairing_agent.remove_from_connection()
+ self._pairing_agent = None
+ self._pairing_agent= PairingAgent(pin, self._system_bus,
+ self.AGENT_PATH)
agent_manager = dbus.Interface(
self._system_bus.get_object(self.BLUEZ_SERVICE_NAME,
self.BLUEZ_AGENT_MANAGER_PATH),
self.BLUEZ_AGENT_MANAGER_IFACE)
try:
- agent_manager.RegisterAgent(agent_path, 'NoInputNoOutput')
+ agent_manager.RegisterAgent(self.AGENT_PATH, self._capability)
except dbus.exceptions.DBusException, e:
if e.get_dbus_name() == self.BLUEZ_ERROR_ALREADY_EXISTS:
logging.info('Unregistering old agent and registering the new')
- agent_manager.UnregisterAgent(agent_path)
- agent_manager.RegisterAgent(agent_path, 'NoInputNoOutput')
+ agent_manager.UnregisterAgent(self.AGENT_PATH)
+ agent_manager.RegisterAgent(self.AGENT_PATH, self._capability)
else:
logging.error('Error setting up pin agent: %s', e)
raise
- logging.info('Agent registered')
+ logging.info('Agent registered: %s', self.AGENT_PATH)
@xmlrpc_server.dbus_safe(False)
@@ -710,6 +746,22 @@
@xmlrpc_server.dbus_safe(False)
+ def device_is_paired(self, address):
+ """Checks if a device is paired.
+
+ @param address: address of the device.
+
+ @returns: True if device is paired. False otherwise.
+
+ """
+ device = self._find_device(address)
+ if not device:
+ logging.error('Device not found')
+ return False
+ return self._is_paired(device)
+
+
+ @xmlrpc_server.dbus_safe(False)
def _is_connected(self, device):
"""Checks if a device is connected.
@@ -725,14 +777,82 @@
@xmlrpc_server.dbus_safe(False)
- def pair_legacy_device(self, address, pin, timeout):
+ def _set_trusted_by_device(self, device, trusted=True):
+ """Set the device trusted by device object.
+
+ @param device: the device object to set trusted.
+ @param trusted: True or False indicating whether to set trusted or not.
+
+ @returns: True if successful. False otherwise.
+
+ """
+ try:
+ properties = dbus.Interface(device, self.DBUS_PROP_IFACE)
+ properties.Set(self.BLUEZ_DEVICE_IFACE, 'Trusted', trusted)
+ return True
+ except Exception as e:
+ logging.error('_set_trusted_by_device: %s', e)
+ except:
+ logging.error('_set_trusted_by_device: unexpected error')
+ return False
+
+
+ @xmlrpc_server.dbus_safe(False)
+ def _set_trusted_by_path(self, device_path, trusted=True):
+ """Set the device trusted by the device path.
+
+ @param device_path: the object path of the device.
+ @param trusted: True or False indicating whether to set trusted or not.
+
+ @returns: True if successful. False otherwise.
+
+ """
+ try:
+ device = self._system_bus.get_object(self.BLUEZ_SERVICE_NAME,
+ device_path)
+ return self._set_trusted_by_device(device, trusted)
+ except Exception as e:
+ logging.error('_set_trusted_by_path: %s', e)
+ except:
+ logging.error('_set_trusted_by_path: unexpected error')
+ return False
+
+
+ @xmlrpc_server.dbus_safe(False)
+ def set_trusted(self, address, trusted=True):
+ """Set the device trusted by address.
+
+ @param address: The bluetooth address of the device.
+ @param trusted: True or False indicating whether to set trusted or not.
+
+ @returns: True if successful. False otherwise.
+
+ """
+ try:
+ device = self._find_device(address)
+ return self._set_trusted_by_device(device, trusted)
+ except Exception as e:
+ logging.error('set_trusted: %s', e)
+ except:
+ logging.error('set_trusted: unexpected error')
+ return False
+
+
+ @xmlrpc_server.dbus_safe(False)
+ def pair_legacy_device(self, address, pin, trusted, timeout=60):
"""Pairs a device with a given pin code.
Registers a agent who handles pin code request and
pairs a device with known pin code.
+ Note that the adapter does not automatically connnect to the device
+ when pairing is done. The connect_device() method has to be invoked
+ explicitly to connect to the device. This provides finer control
+ for testing purpose.
+
@param address: Address of the device to pair.
@param pin: The pin code of the device to pair.
+ @param trusted: indicating whether to set the device trusted.
@param timeout: The timeout in seconds for pairing.
@returns: True on success. False otherwise.
@@ -746,13 +866,19 @@
logging.info('Device is already paired')
return True
- self._setup_pin_agent(pin)
+ device_path = device.object_path
+ logging.info('Device %s is found.' % device.object_path)
+
+ self._setup_pairing_agent(pin)
mainloop = gobject.MainLoop()
def pair_reply():
"""Handler when pairing succeeded."""
- logging.info('Device paired')
+ logging.info('Device paired: %s', device_path)
+ if trusted:
+ self._set_trusted_by_path(device_path, trusted=True)
+ logging.info('Device trusted: %s', device_path)
mainloop.quit()
@@ -766,7 +892,8 @@
try:
error_name = error.get_dbus_name()
if error_name == 'org.freedesktop.DBus.Error.NoReply':
- logging.error('Timed out. Cancelling pairing')
+ logging.error('Timed out after %d ms. Cancelling pairing.',
+ timeout)
device.CancelPairing()
else:
logging.error('Pairing device failed: %s', error)
diff --git a/server/cros/bluetooth/bluetooth_device.py b/server/cros/bluetooth/bluetooth_device.py
index e48c223..8f45e47 100644
--- a/server/cros/bluetooth/bluetooth_device.py
+++ b/server/cros/bluetooth/bluetooth_device.py
@@ -350,6 +350,27 @@
return json.loads(self._proxy.get_devices())
+ def get_device_properties(self, address):
+ """Read information about remote devices known to the adapter.
+
+ An example of the device information of RN-42 looks like
+
+ @param address: Address of the device to pair.
+ @param pin: The pin code of the device to pair.
+ @param timeout: The timeout in seconds for pairing.
+
+ @returns: a dictionary of device properties of the device on success;
+ an empty dictionary otherwise.
+
+ """
+ return json.loads(self._proxy.get_device_by_address(address))
+
+ for device in self.get_devices():
+ if device.get['Address'] == address:
+ return device
+ return dict()
+
+
def start_discovery(self):
"""Start discovery of remote devices.
@@ -424,7 +445,30 @@
return self._proxy.has_device(address)
- def pair_legacy_device(self, address, pin, timeout):
+ def device_is_paired(self, address):
+ """Checks if a device is paired.
+
+ @param address: address of the device.
+
+ @returns: True if device is paired. False otherwise.
+
+ """
+ return self._proxy.device_is_paired(address)
+
+
+ def set_trusted(self, address, trusted=True):
+ """Set the device trusted.
+
+ @param address: The bluetooth address of the device.
+ @param trusted: True or False indicating whether to set trusted or not.
+
+ @returns: True if successful. False otherwise.
+
+ """
+ return self._proxy.set_trusted(address, trusted)
+
+
+ def pair_legacy_device(self, address, pin, trusted, timeout):
"""Pairs a device with a given pin code.
Registers an agent who handles pin code request and
@@ -432,12 +476,27 @@
@param address: Address of the device to pair.
@param pin: The pin code of the device to pair.
+ @param trusted: indicating whether to set the device trusted.
@param timeout: The timeout in seconds for pairing.
@returns: True on success. False otherwise.
"""
- return self._proxy.pair_legacy_device(address, pin, timeout)
+ return self._proxy.pair_legacy_device(address, pin, trusted, timeout)
+
+
+ def remove_device_object(self, address):
+ """Removes a device object and the pairing information.
+
+ Calls RemoveDevice method to remove remote device
+ object and the pairing information.
+
+ @param address: address of the device to unpair.
+
+ @returns: True on success. False otherwise.
+
+ """
+ return self._proxy.remove_device_object(address)
def connect_device(self, address):