miniterm: redo internals, bytes, text-to-text transformations, CR/LF conversion
diff --git a/serial/tools/miniterm.py b/serial/tools/miniterm.py
index 7864b0e..111748a 100644
--- a/serial/tools/miniterm.py
+++ b/serial/tools/miniterm.py
@@ -10,6 +10,7 @@
# done), received characters are displayed as is (or escaped through pythons
# repr, useful for debug purposes)
+import codecs
import os
import sys
import threading
@@ -25,6 +26,7 @@
raw_input
except NameError:
raw_input = input # in python3 it's "raw"
+ unichr = chr
EXITCHARCTER = b'\x1d' # GS/CTRL+]
MENUCHARACTER = b'\x14' # Menu: CTRL+T
@@ -60,7 +62,6 @@
--- Toggles:
--- %(rts)-7s RTS %(echo)-7s local echo
--- %(dtr)-7s DTR %(break)-7s BREAK
---- %(lfm)-7s line feed %(repr)-7s Cycle repr mode
---
--- Port settings (%(menu)s followed by the following):
--- p change port
@@ -75,9 +76,7 @@
'exit': key_description(EXITCHARCTER),
'menu': key_description(MENUCHARACTER),
'rts': key_description(b'\x12'),
- 'repr': key_description(b'\x01'),
'dtr': key_description(b'\x04'),
- 'lfm': key_description(b'\x0c'),
'break': key_description(b'\x02'),
'echo': key_description(b'\x05'),
'info': key_description(b'\x09'),
@@ -87,57 +86,72 @@
}
-LF = b'\n'
-CR = b'\r'
-CRLF = b'\r\n'
+class ConsoleBase(object):
+ def __init__(self):
+ if sys.version_info >= (3, 0):
+ self.byte_output = sys.stdout.buffer
+ else:
+ self.byte_output = sys.stdout
+ self.output = sys.stdout
-X00 = b'\x00'
-X0E = b'\x0e'
+ def setup(self):
+ pass # Do nothing for 'nt'
-# first choose a platform dependant way to read single characters from the console
-global console
+ def cleanup(self):
+ pass # Do nothing for 'nt'
+
+ def getkey(self):
+ return None
+
+ def write_bytes(self, s):
+ self.byte_output.write(s)
+ self.byte_output.flush()
+
+ def write(self, s):
+ self.output.write(s)
+ self.output.flush()
+
if os.name == 'nt':
import msvcrt
- class Console(object):
- def __init__(self):
- if sys.version_info >= (3, 0):
- self.output = sys.stdout.buffer
- else:
- self.output = sys.stdout
-
- def setup(self):
- pass # Do nothing for 'nt'
-
- def cleanup(self):
- pass # Do nothing for 'nt'
-
+ import ctypes
+ class Console(ConsoleBase):
def getkey(self):
while True:
- z = msvcrt.getch()
- if z == X00 or z == X0E: # functions keys, ignore
- msvcrt.getch()
+ z = msvcrt.getwch()
+ if z == '\r':
+ return '\n'
+ elif z in '\x00\x0e': # functions keys, ignore
+ msvcrt.getwch()
else:
- if z == CR:
- return LF
return z
-
+
def write(self, s):
- self.output.write(s)
- self.output.flush()
+ # works, kind of.. can't print unicode
+ self.byte_output.write(s.encode(sys.stdout.encoding, 'replace'))
+ #~ sys.stdout.write(s) # fails with encoding errors
+ #~ for byte in data: # fails for python3 as it returns ints instead of b''
+ #~ for x in range(len(s)):
+ #~ byte = s[x:x+1]
+ #~ msvcrt.putwch(byte) # blocks?!? non-overlapped win io?
+
+ def write_bytes(self, data):
+ #~ for byte in data: # fails for python3 as it returns ints instead of b''
+ for x in range(len(data)):
+ byte = data[x:x+1]
+ msvcrt.putch(byte)
elif os.name == 'posix':
import atexit
import termios
- class Console(object):
+ class Console(ConsoleBase):
def __init__(self):
+ super(Console, self).__init__()
self.fd = sys.stdin.fileno()
self.old = None
- if sys.version_info >= (3, 0):
- self.output = sys.stdout.buffer
- else:
- self.output = sys.stdout
atexit.register(self.cleanup)
+ if sys.version_info < (3, 0):
+ sys.stdin = codecs.getreader(sys.stdin.encoding)(sys.stdin)
def setup(self):
self.old = termios.tcgetattr(self.fd)
@@ -148,21 +162,134 @@
termios.tcsetattr(self.fd, termios.TCSANOW, new)
def getkey(self):
- c = os.read(self.fd, 1)
- return c
+ return sys.stdin.read(1)
+ #~ c = os.read(self.fd, 1)
+ #~ return c
def cleanup(self):
if self.old is not None:
termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old)
- def write(self, s):
- self.output.write(s)
- self.output.flush()
-
else:
raise NotImplementedError("Sorry no implementation for your platform (%s) available." % sys.platform)
+# XXX how to handle multi byte sequences like CRLF?
+# codecs.IncrementalEncoder would be a good choice
+
+class Transform(object):
+ def input(self, text):
+ """text received from serial port"""
+ return text
+
+ def output(self, text):
+ """text to be sent to serial port"""
+ return text
+
+ def echo(self, text):
+ """text to be sent but displayed on console"""
+ return text
+
+class CRLF(Transform):
+ """ENTER sends CR+LF"""
+ def input(self, text):
+ return text.replace('\r\n', '\n')
+
+ def output(self, text):
+ return text.replace('\n', '\r\n')
+
+class CR(Transform):
+ """ENTER sends CR"""
+ def input(self, text):
+ return text.replace('\r', '\n')
+
+ def output(self, text):
+ return text.replace('\n', '\r')
+
+class LF(Transform):
+ """ENTER sends LF"""
+
+
+class NoTerminal(Transform):
+ """remove typical terminal control codes from input"""
+ def input(self, text):
+ return ''.join(t if t >= ' ' or t in '\r\n' else unichr(0x2400 + ord(t)) for t in text)
+
+ echo = input
+
+
+class NoControls(Transform):
+ """Remove all control codes, incl. CR+LF"""
+ def input(self, text):
+ return ''.join(t if t >= ' ' else unichr(0x2400 + ord(t)) for t in text)
+
+ echo = input
+
+
+class HexDump(Transform):
+ """Complete hex dump"""
+ def input(self, text):
+ return ''.join('{:02x} '.format(ord(t)) for t in text)
+
+ echo = input
+
+
+class Printable(Transform):
+ """Show decimal code for all non-ASCII characters and control codes"""
+ def input(self, text):
+ r = []
+ for t in text:
+ if ' ' <= t < '\x7f' or t in '\r\n':
+ r.append(t)
+ else:
+ r.extend(unichr(0x2080 + ord(d) - 48) for d in '{:d}'.format(ord(t)))
+ r.append(' ')
+ return ''.join(r)
+
+ echo = input
+
+
+
+class Colorize(Transform):
+ """Apply different colors for input and echo"""
+ def __init__(self):
+ # XXX make it configurable, use colorama?
+ self.input_color = '\x1b[37m'
+ self.echo_color = '\x1b[31m'
+
+ def input(self, text):
+ return self.input_color + text
+
+ def echo(self, text):
+ return self.echo_color + text
+
+class DebugIO(Transform):
+ """Apply different colors for input and echo"""
+ def input(self, text):
+ sys.stderr.write('in: %r\n' % text)
+ return text
+
+ def echo(self, text):
+ sys.stderr.write('out: %r\n' % text)
+ return text
+
+# other ideas:
+# - add date/time for each newline
+# - insert newline after: a) timeout b) packet end character
+
+TRANSFORMATIONS = {
+ 'crlf': CRLF,
+ 'cr': CR,
+ 'lf': LF,
+ 'default': NoTerminal,
+ 'nocontrol': NoControls,
+ 'printable': Printable,
+ 'hex': HexDump,
+ 'colorize': Colorize,
+ 'debug': DebugIO,
+ }
+
+
def dump_port_list():
if comports:
sys.stderr.write('\n--- Available ports:\n')
@@ -171,25 +298,21 @@
sys.stderr.write('--- %-20s %s\n' % (port, desc))
-CONVERT_CRLF = 2
-CONVERT_CR = 1
-CONVERT_LF = 0
-NEWLINE_CONVERISON_MAP = (LF, CR, CRLF)
-LF_MODES = ('LF', 'CR', 'CR/LF')
-
-REPR_MODES = ('raw', 'some control', 'all control', 'hex')
-
class Miniterm(object):
- def __init__(self, port, baudrate, parity, rtscts, xonxoff, echo=False, convert_outgoing=CONVERT_CRLF, repr_mode=0):
+ def __init__(self, port, baudrate, parity, rtscts, xonxoff, echo=False, transformations=()):
self.console = Console()
self.serial = serial.serial_for_url(port, baudrate, parity=parity, rtscts=rtscts, xonxoff=xonxoff, timeout=1)
self.echo = echo
- self.repr_mode = repr_mode
- self.convert_outgoing = convert_outgoing
- self.newline = NEWLINE_CONVERISON_MAP[self.convert_outgoing]
self.dtr_state = True
self.rts_state = True
self.break_state = False
+ self.raw = False
+ self.input_encoding = 'latin1'
+ self.input_error_handling = 'replace'
+ self.output_encoding = 'latin1'
+ self.output_error_handling = 'ignore'
+ self.transformations = [TRANSFORMATIONS[t]() for t in transformations]
+ self.transformation_names = transformations
def _start_reader(self):
"""Start reader thread"""
@@ -240,51 +363,38 @@
(self.serial.getRI() and 'active' or 'inactive'),
(self.serial.getCD() and 'active' or 'inactive')))
except serial.SerialException:
- # on RFC 2217 ports it can happen to no modem state notification was
+ # on RFC 2217 ports, it can happen to no modem state notification was
# yet received. ignore this error.
pass
sys.stderr.write('--- software flow control: %s\n' % (self.serial.xonxoff and 'active' or 'inactive'))
sys.stderr.write('--- hardware flow control: %s\n' % (self.serial.rtscts and 'active' or 'inactive'))
- sys.stderr.write('--- data escaping: %s linefeed: %s\n' % (
- REPR_MODES[self.repr_mode],
- LF_MODES[self.convert_outgoing]))
+ #~ sys.stderr.write('--- data escaping: %s linefeed: %s\n' % (
+ #~ REPR_MODES[self.repr_mode],
+ #~ LF_MODES[self.convert_outgoing]))
+ sys.stderr.write('--- serial input encoding: %s\n' % (self.input_encoding,))
+ sys.stderr.write('--- serial output encoding: %s\n' % (self.output_encoding,))
+ sys.stderr.write('--- transformations: %s\n' % ' '.join(self.transformation_names))
def reader(self):
"""loop and copy serial->console"""
try:
while self.alive and self._reader_alive:
- data = self.serial.read(1)
-
- if self.repr_mode == 0:
- # direct output, just have to care about newline setting
- if data == b'\r' and self.convert_outgoing == CONVERT_CR:
- self.console.write(b'\n')
+ data = self.serial.read(1) + self.serial.read(self.serial.inWaiting())
+ if data:
+ if self.raw:
+ self.console.write_bytes(data)
else:
- self.console.write(data)
- elif self.repr_mode == 1:
- # escape non-printable, let pass newlines
- if self.convert_outgoing == CONVERT_CRLF and data in '\r\n':
- if data == b'\n':
- self.console.write(b'\n')
- elif data == b'\r':
- pass
- elif data == b'\n' and self.convert_outgoing == CONVERT_LF:
- self.console.write(b'\n')
- elif data == b'\r' and self.convert_outgoing == CONVERT_CR:
- self.console.write(b'\n')
- else:
- self.console.write(repr(data)[1:-1])
- elif self.repr_mode == 2:
- # escape all non-printable, including newline
- self.console.write(repr(data)[1:-1])
- elif self.repr_mode == 3:
- # escape everything (hexdump)
- for c in data:
- self.console.write(b"%s " % c.encode('hex'))
+ text = codecs.decode(
+ data,
+ self.input_encoding,
+ self.input_error_handling)
+ for transformation in self.transformations:
+ text = transformation.input(text)
+ self.console.write(text)
except serial.SerialException as e:
self.alive = False
- # would be nice if the console reader could be interruptted at this
- # point...
+ # XXX would be nice if the writer could be interrupted at this
+ # point... to exit completely
raise
@@ -300,10 +410,14 @@
try:
c = self.console.getkey()
except KeyboardInterrupt:
- c = b'\x03'
+ c = '\x03'
if menu_active:
if c == MENUCHARACTER or c == EXITCHARCTER: # Menu character again/exit char -> send itself
- self.serial.write(b) # send character
+ b = codecs.encode(
+ c,
+ self.output_encoding,
+ self.output_error_handling)
+ self.serial.write(b)
if self.echo:
self.console.write(c)
elif c == b'\x15': # CTRL+U -> upload file
@@ -327,41 +441,28 @@
except IOError as e:
sys.stderr.write('--- ERROR opening file %s: %s ---\n' % (filename, e))
self.console.setup()
- elif c in b'\x08hH?': # CTRL+H, h, H, ? -> Show help
+ elif c in '\x08hH?': # CTRL+H, h, H, ? -> Show help
sys.stderr.write(get_help_text())
- elif c == b'\x12': # CTRL+R -> Toggle RTS
+ elif c == '\x12': # CTRL+R -> Toggle RTS
self.rts_state = not self.rts_state
self.serial.setRTS(self.rts_state)
sys.stderr.write('--- RTS %s ---\n' % (self.rts_state and 'active' or 'inactive'))
- elif c == b'\x04': # CTRL+D -> Toggle DTR
+ elif c == '\x04': # CTRL+D -> Toggle DTR
self.dtr_state = not self.dtr_state
self.serial.setDTR(self.dtr_state)
sys.stderr.write('--- DTR %s ---\n' % (self.dtr_state and 'active' or 'inactive'))
- elif c == b'\x02': # CTRL+B -> toggle BREAK condition
+ elif c == '\x02': # CTRL+B -> toggle BREAK condition
self.break_state = not self.break_state
self.serial.setBreak(self.break_state)
sys.stderr.write('--- BREAK %s ---\n' % (self.break_state and 'active' or 'inactive'))
- elif c == b'\x05': # CTRL+E -> toggle local echo
+ elif c == '\x05': # CTRL+E -> toggle local echo
self.echo = not self.echo
sys.stderr.write('--- local echo %s ---\n' % (self.echo and 'active' or 'inactive'))
- elif c == b'\x09': # CTRL+I -> info
+ elif c == '\x09': # CTRL+I -> info
self.dump_port_settings()
- elif c == b'\x01': # CTRL+A -> cycle escape mode
- self.repr_mode += 1
- if self.repr_mode > 3:
- self.repr_mode = 0
- sys.stderr.write('--- escape data: %s ---\n' % (
- REPR_MODES[self.repr_mode],
- ))
- elif c == b'\x0c': # CTRL+L -> cycle linefeed mode
- self.convert_outgoing += 1
- if self.convert_outgoing > 2:
- self.convert_outgoing = 0
- self.newline = NEWLINE_CONVERISON_MAP[self.convert_outgoing]
- sys.stderr.write('--- line feed %s ---\n' % (
- LF_MODES[self.convert_outgoing],
- ))
- elif c in b'pP': # P -> change port
+ #~ elif c == '\x01': # CTRL+A -> cycle escape mode
+ #~ elif c == '\x0c': # CTRL+L -> cycle linefeed mode
+ elif c in 'pP': # P -> change port
dump_port_list()
sys.stderr.write('--- Enter port name: ')
sys.stderr.flush()
@@ -377,13 +478,7 @@
# save settings
settings = self.serial.getSettingsDict()
try:
- try:
- new_serial = serial.serial_for_url(port, do_not_open=True)
- except AttributeError:
- # happens when the installed pyserial is older than 2.5. use the
- # Serial class directly then.
- new_serial = serial.Serial()
- new_serial.port = port
+ new_serial = serial.serial_for_url(port, do_not_open=True)
# restore settings and open
new_serial.applySettingsDict(settings)
new_serial.open()
@@ -399,7 +494,7 @@
sys.stderr.write('--- Port changed to: %s ---\n' % (self.serial.port,))
# and restart the reader thread
self._start_reader()
- elif c in b'bB': # B -> change baudrate
+ elif c in 'bB': # B -> change baudrate
sys.stderr.write('\n--- Baudrate: ')
sys.stderr.flush()
self.console.cleanup()
@@ -412,58 +507,64 @@
else:
self.dump_port_settings()
self.console.setup()
- elif c == b'8': # 8 -> change to 8 bits
+ elif c == '8': # 8 -> change to 8 bits
self.serial.bytesize = serial.EIGHTBITS
self.dump_port_settings()
- elif c == b'7': # 7 -> change to 8 bits
+ elif c == '7': # 7 -> change to 8 bits
self.serial.bytesize = serial.SEVENBITS
self.dump_port_settings()
- elif c in b'eE': # E -> change to even parity
+ elif c in 'eE': # E -> change to even parity
self.serial.parity = serial.PARITY_EVEN
self.dump_port_settings()
- elif c in b'oO': # O -> change to odd parity
+ elif c in 'oO': # O -> change to odd parity
self.serial.parity = serial.PARITY_ODD
self.dump_port_settings()
- elif c in b'mM': # M -> change to mark parity
+ elif c in 'mM': # M -> change to mark parity
self.serial.parity = serial.PARITY_MARK
self.dump_port_settings()
- elif c in b'sS': # S -> change to space parity
+ elif c in 'sS': # S -> change to space parity
self.serial.parity = serial.PARITY_SPACE
self.dump_port_settings()
- elif c in b'nN': # N -> change to no parity
+ elif c in 'nN': # N -> change to no parity
self.serial.parity = serial.PARITY_NONE
self.dump_port_settings()
- elif c == b'1': # 1 -> change to 1 stop bits
+ elif c == '1': # 1 -> change to 1 stop bits
self.serial.stopbits = serial.STOPBITS_ONE
self.dump_port_settings()
- elif c == b'2': # 2 -> change to 2 stop bits
+ elif c == '2': # 2 -> change to 2 stop bits
self.serial.stopbits = serial.STOPBITS_TWO
self.dump_port_settings()
- elif c == b'3': # 3 -> change to 1.5 stop bits
+ elif c == '3': # 3 -> change to 1.5 stop bits
self.serial.stopbits = serial.STOPBITS_ONE_POINT_FIVE
self.dump_port_settings()
- elif c in b'xX': # X -> change software flow control
- self.serial.xonxoff = (c == b'X')
+ elif c in 'xX': # X -> change software flow control
+ self.serial.xonxoff = (c == 'X')
self.dump_port_settings()
- elif c in b'rR': # R -> change hardware flow control
- self.serial.rtscts = (c == b'R')
+ elif c in 'rR': # R -> change hardware flow control
+ self.serial.rtscts = (c == 'R')
self.dump_port_settings()
else:
sys.stderr.write('--- unknown menu character %s --\n' % key_description(c))
menu_active = False
elif c == MENUCHARACTER: # next char will be for menu
menu_active = True
- elif c == EXITCHARCTER:
+ elif c == EXITCHARCTER:
self.stop()
break # exit app
- elif c == b'\n':
- self.serial.write(self.newline) # send newline character(s)
- if self.echo:
- sys.console.write(c) # local echo is a real newline in any case
else:
- self.serial.write(c) # send byte
+ #~ if self.raw:
+ text = c
+ echo_text = text
+ for transformation in self.transformations:
+ text = transformation.output(text)
+ echo_text = transformation.echo(echo_text)
+ b = codecs.encode(
+ text,
+ self.output_encoding,
+ self.output_error_handling)
+ self.serial.write(b)
if self.echo:
- self.console.write(c)
+ self.console.write(echo_text)
except:
self.alive = False
raise
@@ -540,6 +641,13 @@
default = False
)
+ group.add_option("-t", "--transformation",
+ dest = "transformations",
+ action = "append",
+ help = "Add text transformation",
+ default = []
+ )
+
group.add_option("--cr",
dest = "cr",
action = "store_true",
@@ -554,21 +662,15 @@
default = False
)
- group.add_option("-D", "--debug",
- dest = "repr_mode",
- action = "count",
- help = """debug received data (escape non-printable chars)
---debug can be given multiple times:
-0: just print what is received
-1: escape non-printable characters, do newlines as unusual
-2: escape non-printable characters, newlines too
-3: hex dump everything""",
- default = 0
+ group.add_option("--raw",
+ dest = "raw",
+ action = "store_true",
+ help = "Do no apply any encodings/transformations",
+ default = False
)
parser.add_option_group(group)
-
group = optparse.OptionGroup(parser, "Hotkeys")
group.add_option("--exit-char",
@@ -605,8 +707,6 @@
default = False
)
-
-
parser.add_option_group(group)
@@ -623,8 +723,8 @@
parser.error('--exit-char can not be the same as --menu-char')
global EXITCHARCTER, MENUCHARACTER
- EXITCHARCTER = serial.to_bytes([options.exit_char])
- MENUCHARACTER = serial.to_bytes([options.menu_char])
+ EXITCHARCTER = chr(options.exit_char)
+ MENUCHARACTER = chr(options.menu_char)
port = options.port
baudrate = options.baudrate
@@ -646,11 +746,22 @@
dump_port_list()
port = raw_input('Enter port name:')
- convert_outgoing = CONVERT_CRLF
+ if options.transformations:
+ if 'help' in options.transformations:
+ sys.stderr.write('Available Transformations:\n')
+ sys.stderr.write('\n'.join('{:<20} = {.__doc__}'.format(k,v) for k,v in sorted(TRANSFORMATIONS.items())))
+ sys.stderr.write('\n')
+ sys.exit(1)
+ transformations = options.transformations
+ else:
+ transformations = ['default']
+
if options.cr:
- convert_outgoing = CONVERT_CR
+ transformations.append('cr')
elif options.lf:
- convert_outgoing = CONVERT_LF
+ transformations.append('lf')
+ else:
+ transformations.append('crlf')
try:
miniterm = Miniterm(
@@ -660,9 +771,9 @@
rtscts=options.rtscts,
xonxoff=options.xonxoff,
echo=options.echo,
- convert_outgoing=convert_outgoing,
- repr_mode=options.repr_mode,
+ transformations=transformations,
)
+ miniterm.raw = options.raw
except serial.SerialException as e:
sys.stderr.write("could not open port %r: %s\n" % (port, e))
if options.develop:
@@ -703,7 +814,6 @@
if not options.quiet:
sys.stderr.write("\n--- exit ---\n")
miniterm.join()
- #~ console.cleanup()
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
if __name__ == '__main__':