nfc: Implement an interactive console for NFC debugging.
Wrote a very simple Python shell that has commands to do basic NFC
interactions through neard, such as enumerating adapters, tags, devices
and their properties; reading and writing records from tags; reading
records from devices and pushing NDEFS to devices.
BUG=chromium:305057
TEST=1. Compile Chrome OS with USE="nfc".
2. Make sure all commands work, using a USB NFC adapter, some NFC
tags and a Nexus 4 with NFC enabled.
Change-Id: I87c04ba8055f2f14bb092967f211310301f4435d
Reviewed-on: https://chromium-review.googlesource.com/172701
Reviewed-by: Arman Uguray <armansito@chromium.org>
Commit-Queue: Arman Uguray <armansito@chromium.org>
Tested-by: Arman Uguray <armansito@chromium.org>
diff --git a/client/cros/nfc/__init__.py b/client/cros/nfc/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/client/cros/nfc/__init__.py
diff --git a/client/cros/nfc/console.py b/client/cros/nfc/console.py
new file mode 100755
index 0000000..8c1c910
--- /dev/null
+++ b/client/cros/nfc/console.py
@@ -0,0 +1,673 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import cmd
+import dbus
+import dbus.exceptions
+import dbus.mainloop.glib
+import gobject
+import threading
+
+from functools import wraps
+
+
+DBUS_ERROR = 'org.freedesktop.DBus.Error'
+NEARD_PATH = '/org/neard/'
+PROMPT = 'NFC> '
+
+class NfcClientException(Exception):
+ """Exception class for exceptions thrown by NfcClient."""
+
+
+def print_message(message, newlines=2):
+ """
+ Prints the given message with extra wrapping newline characters.
+
+ @param message: Message to print.
+ @param newlines: Integer, specifying the number of '\n' characters that
+ should be padded at the beginning and end of |message| before
+ being passed to "print".
+
+ """
+ padding = newlines * '\n'
+ message = padding + message + padding
+ print message
+
+
+def handle_errors(func):
+ """
+ Decorator for handling exceptions that are commonly raised by many of the
+ methods in NfcClient.
+
+ @param func: The function this decorator is wrapping.
+
+ """
+ @wraps(func)
+ def _error_handler(*args):
+ try:
+ return func(*args)
+ except dbus.exceptions.DBusException as e:
+ if e.get_dbus_name() == DBUS_ERROR + '.ServiceUnknown':
+ print_message('neard may have crashed or disappeared. '
+ 'Check if neard is running and run "initialize" '
+ 'from this shell.')
+ return
+ if e.get_dbus_name() == DBUS_ERROR + '.UnknownObject':
+ print_message('Could not find object.')
+ return
+ print_message(str(e))
+ except Exception as e:
+ print_message(str(e))
+ return _error_handler
+
+
+class NfcClient(object):
+ """
+ neard D-Bus client
+
+ """
+ NEARD_SERVICE_NAME = 'org.neard'
+ IMANAGER = NEARD_SERVICE_NAME + '.Manager'
+ IADAPTER = NEARD_SERVICE_NAME + '.Adapter'
+ ITAG = NEARD_SERVICE_NAME + '.Tag'
+ IRECORD = NEARD_SERVICE_NAME + '.Record'
+ IDEVICE = NEARD_SERVICE_NAME + '.Device'
+
+ def __init__(self):
+ self._mainloop = None
+ self._mainloop_thread = None
+ self._adapters = {}
+ self._adapter_property_handler_matches = {}
+
+ def begin(self):
+ """
+ Starts the D-Bus client.
+
+ """
+ # Here we run a GLib MainLoop in its own thread, so that the client can
+ # listen to D-Bus signals while keeping the console interactive.
+ self._dbusmainloop = dbus.mainloop.glib.DBusGMainLoop(
+ set_as_default=True)
+ dbus.mainloop.glib.threads_init()
+ gobject.threads_init()
+
+ def _mainloop_thread_func():
+ self._mainloop = gobject.MainLoop()
+ context = self._mainloop.get_context()
+ self._run_loop = True
+ while self._run_loop:
+ context.iteration(True)
+ self._mainloop_thread = threading.Thread(None, _mainloop_thread_func)
+ self._mainloop_thread.start()
+
+ self._bus = dbus.SystemBus()
+ self.setup_manager()
+
+ def end(self):
+ """
+ Stops the D-Bus client.
+
+ """
+ self._run_loop = False
+ self._mainloop.quit()
+ self._mainloop_thread.join()
+
+ def restart(self):
+ """Reinitializes the NFC client."""
+ self.setup_manager()
+
+ @handle_errors
+ def _get_manager_proxy(self):
+ return dbus.Interface(
+ self._bus.get_object(self.NEARD_SERVICE_NAME, '/'),
+ self.IMANAGER)
+
+ @handle_errors
+ def _get_adapter_proxy(self, adapter):
+ return dbus.Interface(
+ self._bus.get_object(self.NEARD_SERVICE_NAME, adapter),
+ self.IADAPTER)
+
+ def _get_cached_adapter_proxy(self, adapter):
+ adapter_proxy = self._adapters.get(adapter, None)
+ if not adapter_proxy:
+ raise NfcClientException('Adapter "' + adapter + '" not found.')
+ return adapter_proxy
+
+
+ @handle_errors
+ def _get_tag_proxy(self, tag):
+ return dbus.Interface(
+ self._bus.get_object(self.NEARD_SERVICE_NAME, tag),
+ self.ITAG)
+
+ @handle_errors
+ def _get_device_proxy(self, device):
+ return dbus.Interface(
+ self._bus.get_object(self.NEARD_SERVICE_NAME, device),
+ self.IDEVICE)
+
+ @handle_errors
+ def _get_record_proxy(self, record):
+ return dbus.Interface(
+ self._bus.get_object(self.NEARD_SERVICE_NAME, record),
+ self.IRECORD)
+
+ @handle_errors
+ def _get_adapter_properties(self, adapter):
+ adapter_proxy = self._get_cached_adapter_proxy(adapter)
+ return adapter_proxy.GetProperties()
+
+ def _get_adapters(self):
+ props = self._manager.GetProperties()
+ return props.get('Adapters', None)
+
+ def setup_manager(self):
+ """
+ Creates a manager proxy and subscribes to adapter signals. This method
+ will also initialize proxies for adapters if any are available.
+
+ """
+ # Create the manager proxy.
+ self._adapters.clear()
+ self._manager = self._get_manager_proxy()
+ if not self._manager:
+ print_message('Failed to create a proxy to the Manager interface.')
+ return
+
+ # Listen to the adapter added and removed signals.
+ self._manager.connect_to_signal(
+ 'AdapterAdded',
+ lambda adapter: self.register_adapter(str(adapter)))
+ self._manager.connect_to_signal(
+ 'AdapterRemoved',
+ lambda adapter: self.unregister_adapter(str(adapter)))
+
+ # See if there are any adapters and create proxies for each.
+ adapters = self._get_adapters()
+ if adapters:
+ for adapter in adapters:
+ self.register_adapter(adapter)
+
+ def register_adapter(self, adapter):
+ """
+ Registers an adapter proxy with the given object path and subscribes to
+ adapter signals.
+
+ @param adapter: string, containing the adapter's D-Bus object path.
+
+ """
+ print_message('Added adapter: ' + adapter)
+ adapter_proxy = self._get_adapter_proxy(adapter)
+ self._adapters[adapter] = adapter_proxy
+
+ # Tag found/lost currently don't get fired. Monitor property changes
+ # instead.
+ if self._adapter_property_handler_matches.get(adapter, None) is None:
+ self._adapter_property_handler_matches[adapter] = (
+ adapter_proxy.connect_to_signal(
+ 'PropertyChanged',
+ (lambda name, value:
+ self._adapter_property_changed_signal(
+ adapter, name, value))))
+
+ def unregister_adapter(self, adapter):
+ """
+ Removes the adapter proxy for the given object path from the internal
+ cache of adapters.
+
+ @param adapter: string, containing the adapter's D-Bus object path.
+
+ """
+ print_message('Removed adapter: ' + adapter)
+ match = self._adapter_property_handler_matches.get(adapter, None)
+ if match is not None:
+ match.remove()
+ self._adapter_property_handler_matches.pop(adapter)
+ self._adapters.pop(adapter)
+
+ def _adapter_property_changed_signal(self, adapter, name, value):
+ if name == 'Tags' or name == 'Devices':
+ print_message('Found ' + name + ': ' +
+ self._dbus_array_to_string(value))
+ if len(value) == 0:
+ self.start_polling(adapter)
+
+ @handle_errors
+ def show_adapters(self):
+ """
+ Prints the D-Bus object paths of all adapters that are available.
+
+ """
+ adapters = self._get_adapters()
+ if not adapters:
+ print_message('No adapters found.')
+ return
+ for adapter in adapters:
+ print_message(' ' + str(adapter), newlines=0)
+ print
+
+ def _dbus_array_to_string(self, array):
+ string = '[ '
+ for value in array:
+ string += ' ' + str(value) + ', '
+ string += ' ]'
+ return string
+
+ def print_adapter_status(self, adapter):
+ """
+ Prints the properties of the given adapter.
+
+ @param adapter: string, containing the adapter's D-Bus object path.
+
+ """
+ props = self._get_adapter_properties(adapter)
+ if not props:
+ return
+ print_message('Status ' + adapter + ': ', newlines=0)
+ for key, value in props.iteritems():
+ if type(value) == dbus.Array:
+ value = self._dbus_array_to_string(value)
+ else:
+ value = str(value)
+ print_message(' ' + key + ' = ' + value, newlines=0)
+ print
+
+ @handle_errors
+ def set_powered(self, adapter, powered):
+ """
+ Enables or disables the adapter.
+
+ @param adapter: string, containing the adapter's D-Bus object path.
+ @param powered: boolean that dictates whether the adapter will be
+ enabled or disabled.
+
+ """
+ adapter_proxy = self._get_cached_adapter_proxy(adapter)
+ if not adapter_proxy:
+ return
+ adapter_proxy.SetProperty('Powered', powered)
+
+ @handle_errors
+ def start_polling(self, adapter):
+ """
+ Starts polling for nearby tags and devices in "Initiator" mode.
+
+ @param adapter: string, containing the adapter's D-Bus object path.
+
+ """
+ adapter_proxy = self._get_cached_adapter_proxy(adapter)
+ adapter_proxy.StartPollLoop('Initiator')
+ print_message('Started polling.')
+
+ @handle_errors
+ def stop_polling(self, adapter):
+ """
+ Stops polling for nearby tags and devices.
+
+ @param adapter: string, containing the adapter's D-Bus object path.
+
+ """
+ adapter_proxy = self._get_cached_adapter_proxy(adapter)
+ adapter_proxy.StopPollLoop()
+ self._polling_stopped = True
+ print_message('Stopped polling.')
+
+ @handle_errors
+ def show_tag_data(self, tag):
+ """
+ Prints the properties of the given tag, as well as the contents of any
+ records associated with it.
+
+ @param tag: string, containing the tag's D-Bus object path.
+
+ """
+ tag_proxy = self._get_tag_proxy(tag)
+ if not tag_proxy:
+ print_message('Tag "' + tag + '" not found.')
+ return
+ props = tag_proxy.GetProperties()
+ print_message('Tag ' + tag + ': ', newlines=1)
+ for key, value in props.iteritems():
+ if key != 'Records':
+ print_message(' ' + key + ' = ' + str(value), newlines=0)
+ records = props['Records']
+ if not records:
+ return
+ print_message('Records: ', newlines=1)
+ for record in records:
+ self.show_record_data(str(record))
+ print
+
+ @handle_errors
+ def show_device_data(self, device):
+ """
+ Prints the properties of the given device, as well as the contents of
+ any records associated with it.
+
+ @param device: string, containing the device's D-Bus object path.
+
+ """
+ device_proxy = self._get_device_proxy(device)
+ if not device_proxy:
+ print_message('Device "' + device + '" not found.')
+ return
+ records = device_proxy.GetProperties()['Records']
+ if not records:
+ print_message('No records on device.')
+ return
+ print_message('Records: ', newlines=1)
+ for record in records:
+ self.show_record_data(str(record))
+ print
+
+ @handle_errors
+ def show_record_data(self, record):
+ """
+ Prints the contents of the given record.
+
+ @param record: string, containing the record's D-Bus object path.
+
+ """
+ record_proxy = self._get_record_proxy(record)
+ if not record_proxy:
+ print_message('Record "' + record + '" not found.')
+ return
+ props = record_proxy.GetProperties()
+ print_message('Record ' + record + ': ', newlines=1)
+ for key, value in props.iteritems():
+ print ' ' + key + ' = ' + value
+ print
+
+ def _create_record_data(self, record_type, params):
+ if record_type == 'Text':
+ possible_keys = [ 'Encoding', 'Language', 'Representation' ]
+ tag_data = { 'Type': 'Text' }
+ elif record_type == 'URI':
+ possible_keys = [ 'URI' ]
+ tag_data = { 'Type': 'URI' }
+ else:
+ print_message('Writing record type "' + record_type +
+ '" currently not supported.')
+ return None
+ for key, value in params.iteritems():
+ if key in possible_keys:
+ tag_data[key] = value
+ return tag_data
+
+ @handle_errors
+ def write_tag(self, tag, record_type, params):
+ """
+ Writes an NDEF record to the given tag.
+
+ @param tag: string, containing the tag's D-Bus object path.
+ @param record_type: The type of the record, e.g. Text or URI.
+ @param params: dictionary, containing the parameters of the NDEF.
+
+ """
+ tag_data = self._create_record_data(record_type, params)
+ if not tag_data:
+ return
+ tag_proxy = self._get_tag_proxy(tag)
+ if not tag_proxy:
+ print_message('Tag "' + tag + '" not found.')
+ return
+ tag_proxy.Write(tag_data)
+ print_message('Tag written!')
+
+ @handle_errors
+ def push_to_device(self, device, record_type, params):
+ """
+ Pushes an NDEF record to the given device.
+
+ @param device: string, containing the device's D-Bus object path.
+ @param record_type: The type of the record, e.g. Text or URI.
+ @param params: dictionary, containing the parameters of the NDEF.
+
+ """
+ record_data = self._create_record_data(record_type, params)
+ if not record_data:
+ return
+ device_proxy = self._get_device_proxy(device)
+ if not device_proxy:
+ print_message('Device "' + device + '" not found.')
+ return
+ device_proxy.Push(record_data)
+ print_message('NDEF pushed to device!')
+
+
+class NfcConsole(cmd.Cmd):
+ """
+ Interactive console to interact with the NFC daemon.
+
+ """
+ def __init__(self):
+ cmd.Cmd.__init__(self)
+ self.prompt = PROMPT
+
+ def begin(self):
+ """
+ Starts the interactive shell.
+
+ """
+ print_message('NFC console! Run "help" for a list of commands.',
+ newlines=1)
+ self._nfc_client = NfcClient()
+ self._nfc_client.begin()
+ self.cmdloop()
+
+ def can_exit(self):
+ """Override"""
+ return True
+
+ def do_initialize(self, args):
+ """Handles "initialize"."""
+ if args:
+ print_message('Command "initialize" expects no arguments.')
+ return
+ self._nfc_client.restart()
+
+ def help_initialize(self):
+ """Prints the help message for "initialize"."""
+ print_message('Initializes the neard D-Bus client. This can be '
+ 'run many times to restart the client in case of '
+ 'neard failures or crashes.')
+
+ def do_adapters(self, args):
+ """Handles "adapters"."""
+ if args:
+ print_message('Command "adapters" expects no arguments.')
+ return
+ self._nfc_client.show_adapters()
+
+ def help_adapters(self):
+ """Prints the help message for "adapters"."""
+ print_message('Displays the D-Bus object paths of the available '
+ 'adapter objects.')
+
+ def do_adapter_status(self, args):
+ """Handles "adapter_status"."""
+ args = args.strip().split(' ')
+ if len(args) != 1 or not args[0]:
+ print_message('Usage: adapter_status <adapter>')
+ return
+ self._nfc_client.print_adapter_status(NEARD_PATH + args[0])
+
+ def help_adapter_status(self):
+ """Prints the help message for "adapter_status"."""
+ print_message('Returns the properties of the given NFC adapter.\n\n'
+ ' Ex: "adapter_status nfc0"')
+
+ def do_enable_adapter(self, args):
+ """Handles "enable_adapter"."""
+ args = args.strip().split(' ')
+ if len(args) != 1 or not args[0]:
+ print_message('Usage: enable_adapter <adapter>')
+ return
+ self._nfc_client.set_powered(NEARD_PATH + args[0], True)
+
+ def help_enable_adapter(self):
+ """Prints the help message for "enable_adapter"."""
+ print_message('Powers up the adapter. Ex: "enable_adapter nfc0"')
+
+ def do_disable_adapter(self, args):
+ """Handles "disable_adapter"."""
+ args = args.strip().split(' ')
+ if len(args) != 1 or not args[0]:
+ print_message('Usage: disable_adapter <adapter>')
+ return
+ self._nfc_client.set_powered(NEARD_PATH + args[0], False)
+
+ def help_disable_adapter(self):
+ """Prints the help message for "disable_adapter"."""
+ print_message('Powers down the adapter. Ex: "disable_adapter nfc0"')
+
+ def do_start_poll(self, args):
+ """Handles "start_poll"."""
+ args = args.strip().split(' ')
+ if len(args) != 1 or not args[0]:
+ print_message('Usage: start_poll <adapter>')
+ return
+ self._nfc_client.start_polling(NEARD_PATH + args[0])
+
+ def help_start_poll(self):
+ """Prints the help message for "start_poll"."""
+ print_message('Initiates a poll loop.\n\n Ex: "start_poll nfc0"')
+
+ def do_stop_poll(self, args):
+ """Handles "stop_poll"."""
+ args = args.split(' ')
+ if len(args) != 1 or not args[0]:
+ print_message('Usage: stop_poll <adapter>')
+ return
+ self._nfc_client.stop_polling(NEARD_PATH + args[0])
+
+ def help_stop_poll(self):
+ """Prints the help message for "stop_poll"."""
+ print_message('Stops a poll loop.\n\n Ex: "stop_poll nfc0"')
+
+ def do_read_tag(self, args):
+ """Handles "read_tag"."""
+ args = args.strip().split(' ')
+ if len(args) != 1 or not args[0]:
+ print_message('Usage read_tag <tag>')
+ return
+ self._nfc_client.show_tag_data(NEARD_PATH + args[0])
+
+ def help_read_tag(self):
+ """Prints the help message for "read_tag"."""
+ print_message('Reads the contents of a tag. Ex: read_tag nfc0/tag0')
+
+ def _parse_record_args(self, record_type, args):
+ if record_type == 'Text':
+ if len(args) < 5:
+ print_message('Usage: write_tag <tag> Text <encoding> '
+ '<language> <representation>')
+ return None
+ if args[2] not in [ 'UTF-8', 'UTF-16' ]:
+ print_message('Encoding must be one of "UTF-8" or "UTF-16".')
+ return None
+ return {
+ 'Encoding': args[2],
+ 'Language': args[3],
+ 'Representation': ' '.join(args[4:])
+ }
+ if record_type == 'URI':
+ if len(args) != 3:
+ print_message('Usage: write_tag <tag> URI <uri>')
+ return None
+ return {
+ 'URI': args[2]
+ }
+ print_message('Only types "Text" and "URI" are supported by this '
+ 'script.')
+ return None
+
+ def do_write_tag(self, args):
+ """Handles "write_tag"."""
+ args = args.strip().split(' ')
+ if len(args) < 3:
+ print_message('Usage: write_tag <tag> [params]')
+ return
+ record_type = args[1]
+ params = self._parse_record_args(record_type, args)
+ if not params:
+ return
+ self._nfc_client.write_tag(NEARD_PATH + args[0],
+ record_type, params)
+
+ def help_write_tag(self):
+ """Prints the help message for "write_tag"."""
+ print_message('Writes the given data to a tag. Usage:\n'
+ ' write_tag <tag> Text <encoding> <language> '
+ '<representation>\n write_tag <tag> URI <uri>')
+
+ def do_read_device(self, args):
+ """Handles "read_device"."""
+ args = args.strip().split(' ')
+ if len(args) != 1 or not args[0]:
+ print_message('Usage read_device <device>')
+ return
+ self._nfc_client.show_device_data(NEARD_PATH + args[0])
+
+ def help_read_device(self):
+ """Prints the help message for "read_device"."""
+ print_message('Reads the contents of a device. Ex: read_device '
+ 'nfc0/device0')
+
+ def do_push_to_device(self, args):
+ """Handles "push_to_device"."""
+ args = args.strip().split(' ')
+ if len(args) < 3:
+ print_message('Usage: push_to_device <device> [params]')
+ return
+ record_type = args[1]
+ params = self._parse_record_args(record_type, args)
+ if not params:
+ return
+ self._nfc_client.push_to_device(NEARD_PATH + args[0],
+ record_type, params)
+
+ def help_push_to_device(self):
+ """Prints the help message for "push_to_device"."""
+ print_message('Pushes the given data to a device. Usage:\n'
+ ' push_to_device <device> Text <encoding> <language> '
+ '<representation>\n push_to_device <device> URI <uri>')
+
+ def do_exit(self, args):
+ """
+ Handles the 'exit' command.
+
+ @param args: Arguments to the command. Unused.
+
+ """
+ if args:
+ print_message('Command "exit" expects no arguments.')
+ return
+ resp = raw_input('Are you sure? (yes/no): ')
+ if resp == 'yes':
+ print_message('Goodbye!')
+ self._nfc_client.end()
+ return True
+ if resp != 'no':
+ print_message('Did not understand: ' + resp)
+ return False
+
+ def help_exit(self):
+ """Handles the 'help exit' command."""
+ print_message('Exits the console.')
+
+ do_EOF = do_exit
+ help_EOF = help_exit
+
+
+def main():
+ """Main function."""
+ NfcConsole().begin()
+
+
+if __name__ == '__main__':
+ main()