blob: 44ad4eb44048a3f884c5d3b1e09f780d43419f23 [file] [log] [blame]
#! python
#
# Backend for Silicon Labs CP2110/4 HID-to-UART devices.
#
# This file is part of pySerial. https://github.com/pyserial/pyserial
# (C) 2001-2015 Chris Liechti <cliechti@gmx.net>
# (C) 2019 Google LLC
#
# SPDX-License-Identifier: BSD-3-Clause
# This backend implements support for HID-to-UART devices manufactured
# by Silicon Labs and marketed as CP2110 and CP2114. The
# implementation is (mostly) OS-independent and in userland. It relies
# on cython-hidapi (https://github.com/trezor/cython-hidapi).
# The HID-to-UART protocol implemented by CP2110/4 is described in the
# AN434 document from Silicon Labs:
# https://www.silabs.com/documents/public/application-notes/AN434-CP2110-4-Interface-Specification.pdf
# TODO items:
# - rtscts support is configured for hardware flow control, but the
# signaling is missing (AN434 suggests this is done through GPIO).
# - Cancelling reads and writes is not supported.
# - Baudrate validation is not implemented, as it depends on model and configuration.
import struct
import threading
try:
import urlparse
except ImportError:
import urllib.parse as urlparse
try:
import Queue
except ImportError:
import queue as Queue
import hid # hidapi
import serial
from serial.serialutil import SerialBase, SerialException, PortNotOpenError, to_bytes, Timeout
# Report IDs and related constant
_REPORT_GETSET_UART_ENABLE = 0x41
_DISABLE_UART = 0x00
_ENABLE_UART = 0x01
_REPORT_SET_PURGE_FIFOS = 0x43
_PURGE_TX_FIFO = 0x01
_PURGE_RX_FIFO = 0x02
_REPORT_GETSET_UART_CONFIG = 0x50
_REPORT_SET_TRANSMIT_LINE_BREAK = 0x51
_REPORT_SET_STOP_LINE_BREAK = 0x52
class Serial(SerialBase):
# This is not quite correct. AN343 specifies that the minimum
# baudrate is different between CP2110 and CP2114, and it's halved
# when using non-8-bit symbols.
BAUDRATES = (300, 375, 600, 1200, 1800, 2400, 4800, 9600, 19200,
38400, 57600, 115200, 230400, 460800, 500000, 576000,
921600, 1000000)
def __init__(self, *args, **kwargs):
self._hid_handle = None
self._read_buffer = None
self._thread = None
super(Serial, self).__init__(*args, **kwargs)
def open(self):
if self._port is None:
raise SerialException("Port must be configured before it can be used.")
if self.is_open:
raise SerialException("Port is already open.")
self._read_buffer = Queue.Queue()
self._hid_handle = hid.device()
try:
portpath = self.from_url(self.portstr)
self._hid_handle.open_path(portpath)
except OSError as msg:
raise SerialException(msg.errno, "could not open port {}: {}".format(self._port, msg))
try:
self._reconfigure_port()
except:
try:
self._hid_handle.close()
except:
pass
self._hid_handle = None
raise
else:
self.is_open = True
self._thread = threading.Thread(target=self._hid_read_loop)
self._thread.setDaemon(True)
self._thread.setName('pySerial CP2110 reader thread for {}'.format(self._port))
self._thread.start()
def from_url(self, url):
parts = urlparse.urlsplit(url)
if parts.scheme != "cp2110":
raise SerialException(
'expected a string in the forms '
'"cp2110:///dev/hidraw9" or "cp2110://0001:0023:00": '
'not starting with cp2110:// {{!r}}'.format(parts.scheme))
if parts.netloc: # cp2100://BUS:DEVICE:ENDPOINT, for libusb
return parts.netloc.encode('utf-8')
return parts.path.encode('utf-8')
def close(self):
self.is_open = False
if self._thread:
self._thread.join(1) # read timeout is 0.1
self._thread = None
self._hid_handle.close()
self._hid_handle = None
def _reconfigure_port(self):
parity_value = None
if self._parity == serial.PARITY_NONE:
parity_value = 0x00
elif self._parity == serial.PARITY_ODD:
parity_value = 0x01
elif self._parity == serial.PARITY_EVEN:
parity_value = 0x02
elif self._parity == serial.PARITY_MARK:
parity_value = 0x03
elif self._parity == serial.PARITY_SPACE:
parity_value = 0x04
else:
raise ValueError('Invalid parity: {!r}'.format(self._parity))
if self.rtscts:
flow_control_value = 0x01
else:
flow_control_value = 0x00
data_bits_value = None
if self._bytesize == 5:
data_bits_value = 0x00
elif self._bytesize == 6:
data_bits_value = 0x01
elif self._bytesize == 7:
data_bits_value = 0x02
elif self._bytesize == 8:
data_bits_value = 0x03
else:
raise ValueError('Invalid char len: {!r}'.format(self._bytesize))
stop_bits_value = None
if self._stopbits == serial.STOPBITS_ONE:
stop_bits_value = 0x00
elif self._stopbits == serial.STOPBITS_ONE_POINT_FIVE:
stop_bits_value = 0x01
elif self._stopbits == serial.STOPBITS_TWO:
stop_bits_value = 0x01
else:
raise ValueError('Invalid stop bit specification: {!r}'.format(self._stopbits))
configuration_report = struct.pack(
'>BLBBBB',
_REPORT_GETSET_UART_CONFIG,
self._baudrate,
parity_value,
flow_control_value,
data_bits_value,
stop_bits_value)
self._hid_handle.send_feature_report(configuration_report)
self._hid_handle.send_feature_report(
bytes((_REPORT_GETSET_UART_ENABLE, _ENABLE_UART)))
self._update_break_state()
@property
def in_waiting(self):
return self._read_buffer.qsize()
def reset_input_buffer(self):
if not self.is_open:
raise PortNotOpenError()
self._hid_handle.send_feature_report(
bytes((_REPORT_SET_PURGE_FIFOS, _PURGE_RX_FIFO)))
# empty read buffer
while self._read_buffer.qsize():
self._read_buffer.get(False)
def reset_output_buffer(self):
if not self.is_open:
raise PortNotOpenError()
self._hid_handle.send_feature_report(
bytes((_REPORT_SET_PURGE_FIFOS, _PURGE_TX_FIFO)))
def _update_break_state(self):
if not self._hid_handle:
raise PortNotOpenError()
if self._break_state:
self._hid_handle.send_feature_report(
bytes((_REPORT_SET_TRANSMIT_LINE_BREAK, 0)))
else:
# Note that while AN434 states "There are no data bytes in
# the payload other than the Report ID", either hidapi or
# Linux does not seem to send the report otherwise.
self._hid_handle.send_feature_report(
bytes((_REPORT_SET_STOP_LINE_BREAK, 0)))
def read(self, size=1):
if not self.is_open:
raise PortNotOpenError()
data = bytearray()
try:
timeout = Timeout(self._timeout)
while len(data) < size:
if self._thread is None:
raise SerialException('connection failed (reader thread died)')
buf = self._read_buffer.get(True, timeout.time_left())
if buf is None:
return bytes(data)
data += buf
if timeout.expired():
break
except Queue.Empty: # -> timeout
pass
return bytes(data)
def write(self, data):
if not self.is_open:
raise PortNotOpenError()
data = to_bytes(data)
tx_len = len(data)
while tx_len > 0:
to_be_sent = min(tx_len, 0x3F)
report = to_bytes([to_be_sent]) + data[:to_be_sent]
self._hid_handle.write(report)
data = data[to_be_sent:]
tx_len = len(data)
def _hid_read_loop(self):
try:
while self.is_open:
data = self._hid_handle.read(64, timeout_ms=100)
if not data:
continue
data_len = data.pop(0)
assert data_len == len(data)
self._read_buffer.put(bytearray(data))
finally:
self._thread = None