rs485: redo RS485 support, removed RTS toggle function
diff --git a/serial/rs485.py b/serial/rs485.py
new file mode 100644
index 0000000..cc17378
--- /dev/null
+++ b/serial/rs485.py
@@ -0,0 +1,91 @@
+#!/usr/bin/env python
+
+# RS485 support
+#
+# (C) 2015 Chris Liechti <cliechti@gmx.net>
+
+"""\
+The settings for RS485 are stored in a dedicated object that can be applied to
+serial ports (where supported).
+NOTE: Some implementations may only support a subset of the settings.
+"""
+
+import time
+import serial
+
+class RS485Settings(object):
+ def __init__(self,
+ rts_level_for_tx=True,
+ rts_level_for_rx=False,
+ loopback=False,
+ delay_before_tx=None,
+ delay_before_rx=None):
+ self.rts_level_for_tx = rts_level_for_tx
+ self.rts_level_for_rx = rts_level_for_rx
+ self.loopback = loopback
+ self.delay_before_tx = delay_before_tx
+ self.delay_before_rx = delay_before_rx
+
+
+class RS485(serial.Serial):
+ """\
+ A subclass that replaces the write method with one that toggles RTS
+ according to the RS485 settings.
+
+ NOTE: This may work unreliably on some serial ports (control signals not
+ synchronized or delayed compared to data). Using delays may be
+ unreliable (varying times, larger than expected) as the OS may not
+ support very fine grained delays (no smaller than in the order of
+ tens of milliseconds).
+
+ NOTE: Some implementations support this natively. Better performance
+ can be expected when the native version is used.
+
+ NOTE: The loopback property is ignored by this implementation. The actual
+ behavior depends on the used hardware.
+
+ Usage:
+
+ ser = RS485(...)
+ ser.rs485_mode = RS485Settings(...)
+ ser.write(b'hello')
+ """
+
+ def __init__(self, *args, **kwargs):
+ super(RS485, self).__init__(*args, **kwargs)
+ self._alternate_rs485_settings = None
+
+
+ def write(self, b):
+ """Write to port, controlling RTS before and after transmitting."""
+ if self._alternate_rs485_settings is not None:
+ # apply level for TX and optional delay
+ self.setRTS(self._alternate_rs485_settings.rts_level_for_tx)
+ if self._alternate_rs485_settings.delay_before_tx is not None:
+ time.sleep(self._alternate_rs485_settings.delay_before_tx)
+ # write and wait for data to be written
+ super(RS485, self).write(b)
+ super(RS485, self).flush()
+ # optional delay and apply level for RX
+ if self._alternate_rs485_settings.delay_before_rx is not None:
+ time.sleep(self._alternate_rs485_settings.delay_before_rx)
+ self.setRTS(self._alternate_rs485_settings.rts_level_for_rx)
+ else:
+ super(RS485, self).write(b)
+
+
+ # redirect where the property stores the settings so that underlying Serial
+ # instance does not see them
+ @property
+ def rs485_mode(self):
+ """\
+ Enable RS485 mode and apply new settings, set to None to disable.
+ See serial.rs485.RS485Settings for more info about the value.
+ """
+ return self._alternate_rs485_settings
+
+ @rs485_mode.setter
+ def rs485_mode(self, rs485_settings):
+ self._alternate_rs485_settings = rs485_settings
+
+
diff --git a/serial/serialposix.py b/serial/serialposix.py
index 0c3052a..26fecbd 100644
--- a/serial/serialposix.py
+++ b/serial/serialposix.py
@@ -12,8 +12,15 @@
#
# references: http://www.easysw.com/~mike/serial/serial.html
-import sys, os, fcntl, termios, struct, select, errno, time
+import errno
+import fcntl
import io
+import os
+import select
+import struct
+import sys
+import termios
+import time
from serial.serialutil import *
@@ -23,6 +30,7 @@
plat = sys.platform.lower()
if plat[:5] == 'linux': # Linux (confirmed)
+ import array
def device(port):
return '/dev/ttyS%d' % port
@@ -33,7 +41,6 @@
def set_special_baudrate(port, baudrate):
# right size is 44 on x86_64, allow for some growth
- import array
buf = array.array('i', [0] * 64)
try:
@@ -83,6 +90,41 @@
4000000: 0o010017
}
+ # RS485 ioctls
+ TIOCGRS485 = 0x542E
+ TIOCSRS485 = 0x542F
+ SER_RS485_ENABLED = 0b00000001
+ SER_RS485_RTS_ON_SEND = 0b00000010
+ SER_RS485_RTS_AFTER_SEND = 0b00000100
+ SER_RS485_RX_DURING_TX = 0b00010000
+
+ def set_rs485_mode(port, rs485_settings):
+ buf = array.array('i', [0] * 8) # flags, delaytx, delayrx, padding
+
+ try:
+ fcntl.ioctl(port.fd, TIOCGRS485, buf)
+ if rs485_settings is not None:
+ if rs485_settings.loopback:
+ buf[0] |= SER_RS485_RX_DURING_TX
+ else:
+ buf[0] &= ~SER_RS485_RX_DURING_TX
+ if rs485_settings.rts_level_for_tx:
+ buf[0] |= SER_RS485_RTS_ON_SEND
+ else:
+ buf[0] &= ~SER_RS485_RTS_ON_SEND
+ if rs485_settings.rts_level_for_rx:
+ buf[0] |= SER_RS485_RTS_AFTER_SEND
+ else:
+ buf[0] &= ~SER_RS485_RTS_AFTER_SEND
+ buf[1] = int(rs485_settings.delay_rts_before_send * 1000)
+ buf[2] = int(rs485_settings.delay_rts_after_send * 1000)
+ else:
+ buf[0] = 0 # clear SER_RS485_ENABLED
+ res = fcntl.ioctl(port.fd, TIOCSRS485, buf)
+ except IOError as e:
+ raise ValueError('Failed to set RS485 mode: %s' % (e,))
+
+
elif plat == 'cygwin': # cygwin/win32 (confirmed)
def device(port):
@@ -127,13 +169,13 @@
baudrate_constants = {}
elif plat[:6] == 'darwin': # OS X
+ import array
version = os.uname()[2].split('.')
# Tiger or above can support arbitrary serial speeds
if int(version[0]) >= 8:
def set_special_baudrate(port, baudrate):
# use IOKit-specific call to set up high speeds
- import array
buf = array.array('i', [baudrate])
IOSSIOSPEED = 0x80045402 #_IOW('T', 2, speed_t)
fcntl.ioctl(port.fd, IOSSIOSPEED, buf, 1)
@@ -414,6 +456,10 @@
cflag &= ~(termios.CNEW_RTSCTS)
# XXX should there be a warning if setting up rtscts (and xonxoff etc) fails??
+ # XXX linux only
+ if self._rs485_mode is not None:
+ set_rs485_mode(self, self._rs485_mode)
+
# buffer
# vmin "minimal number of characters to be read. 0 for non blocking"
if vmin < 0 or vmin > 255:
diff --git a/serial/serialutil.py b/serial/serialutil.py
index 0844ecd..f23f024 100644
--- a/serial/serialutil.py
+++ b/serial/serialutil.py
@@ -123,6 +123,7 @@
self._rtscts = None # correct value is assigned below through properties
self._dsrdtr = None # correct value is assigned below through properties
self._interCharTimeout = None # correct value is assigned below through properties
+ self._rs485_mode = None # disabled by default
# assign values using get/set methods using the properties feature
self.port = port
@@ -333,6 +334,22 @@
# - - - - - - - - - - - - - - - - - - - - - - - -
+ # functions useful for RS-485 adapters
+
+ @property
+ def rs485_mode(self):
+ """\
+ Enable RS485 mode and apply new settings, set to None to disable.
+ See serial.rs485.RS485Settings for more info about the value.
+ """
+ return self._rs485_mode
+
+ @rs485_mode.setter
+ def rs485_mode(self, rs485_settings):
+ self._rs485_mode = rs485_settings
+ if self._isOpen: self._reconfigurePort()
+
+ # - - - - - - - - - - - - - - - - - - - - - - - -
_SETTINGS = ('baudrate', 'bytesize', 'parity', 'stopbits', 'xonxoff',
'dsrdtr', 'rtscts', 'timeout', 'writeTimeout', 'interCharTimeout')
diff --git a/serial/serialwin32.py b/serial/serialwin32.py
index a90662d..9e0d118 100644
--- a/serial/serialwin32.py
+++ b/serial/serialwin32.py
@@ -178,21 +178,42 @@
comDCB.fBinary = 1 # Enable Binary Transmission
# Char. w/ Parity-Err are replaced with 0xff (if fErrorChar is set to TRUE)
- if self._rtscts:
- comDCB.fRtsControl = win32.RTS_CONTROL_HANDSHAKE
- elif self._rtsToggle:
- comDCB.fRtsControl = win32.RTS_CONTROL_TOGGLE
+ if self._rs485_mode is None:
+ if self._rtscts:
+ comDCB.fRtsControl = win32.RTS_CONTROL_HANDSHAKE
+ else:
+ comDCB.fRtsControl = self._rtsState
+ comDCB.fOutxCtsFlow = self._rtscts
else:
- comDCB.fRtsControl = self._rtsState
+ # checks for unsupported settings
+ # XXX verify if platform really does not have a setting for those
+ if not self._rs485_mode.rts_level_for_tx:
+ raise ValueError(
+ 'Unsupported value for RS485Settings.rts_level_for_tx: %r' % (
+ self._rs485_mode.rts_level_for_tx,))
+ if self._rs485_mode.rts_level_for_rx:
+ raise ValueError(
+ 'Unsupported value for RS485Settings.rts_level_for_rx: %r' % (
+ self._rs485_mode.rts_level_for_rx,))
+ if self._rs485_mode.delay_before_tx is not None:
+ raise ValueError(
+ 'Unsupported value for RS485Settings.delay_before_tx: %r' % (
+ self._rs485_mode.delay_before_tx,))
+ if self._rs485_mode.delay_before_rx is not None:
+ raise ValueError(
+ 'Unsupported value for RS485Settings.delay_before_rx: %r' % (
+ self._rs485_mode.delay_before_rx,))
+ if self._rs485_mode.loopback:
+ raise ValueError(
+ 'Unsupported value for RS485Settings.loopback: %r' % (
+ self._rs485_mode.loopback,))
+ comDCB.fRtsControl = win32.RTS_CONTROL_TOGGLE
+ comDCB.fOutxCtsFlow = 0
+
if self._dsrdtr:
comDCB.fDtrControl = win32.DTR_CONTROL_HANDSHAKE
else:
comDCB.fDtrControl = self._dtrState
-
- if self._rtsToggle:
- comDCB.fOutxCtsFlow = 0
- else:
- comDCB.fOutxCtsFlow = self._rtscts
comDCB.fOutxDsrFlow = self._dsrdtr
comDCB.fOutX = self._xonxoff
comDCB.fInX = self._xonxoff
@@ -427,17 +448,20 @@
return comstat.cbOutQue
# functions useful for RS-485 adapters
- def setRtsToggle(self, rtsToggle):
- """Change RTS toggle control setting."""
- self._rtsToggle = rtsToggle
+ @property
+ def rs485_mode(self):
+ """\
+ enable RS485 mode and apply new settings, set to None to disable.
+ see serial.rs485 for more info about the value.
+ """
+ return self.rs485_settings
+
+ @rs485_mode.setter
+ def rs485_mode(self, rs485_settings):
+ self.rs485_settings = rs485_settings
+ self._rtsToggle = rs485_settings is not None
if self._isOpen: self._reconfigurePort()
- def getRtsToggle(self):
- """Get the current RTS toggle control setting."""
- return self._rtsToggle
-
- rtsToggle = property(getRtsToggle, setRtsToggle, doc="RTS toggle control setting")
-
t
# Nur Testfunktion!!