miniterm: refactor key handler and extend suspend function
diff --git a/serial/tools/miniterm.py b/serial/tools/miniterm.py
index 634264e..c5f8b8c 100644
--- a/serial/tools/miniterm.py
+++ b/serial/tools/miniterm.py
@@ -502,25 +502,7 @@
if self.echo:
self.console.write(c)
elif c == '\x15': # CTRL+U -> upload file
- sys.stderr.write('\n--- File to upload: ')
- sys.stderr.flush()
- with self.console:
- filename = sys.stdin.readline().rstrip('\r\n')
- if filename:
- try:
- with open(filename, 'rb') as f:
- sys.stderr.write('--- Sending file {} ---\n'.format(filename))
- while True:
- block = f.read(1024)
- if not block:
- break
- self.serial.write(block)
- # Wait for output buffer to drain.
- self.serial.flush()
- sys.stderr.write('.') # Progress indicator.
- sys.stderr.write('\n--- File {} sent ---\n'.format(filename))
- except IOError as e:
- sys.stderr.write('--- ERROR opening file {}: {} ---\n'.format(filename, e))
+ self.upload_file()
elif c in '\x08hH?': # CTRL+H, h, H, ? -> Show help
sys.stderr.write(self.get_help_text())
elif c == '\x12': # CTRL+R -> Toggle RTS
@@ -536,22 +518,7 @@
self.echo = not self.echo
sys.stderr.write('--- local echo {} ---\n'.format('active' if self.echo else 'inactive'))
elif c == '\x06': # CTRL+F -> edit filters
- sys.stderr.write('\n--- Available Filters:\n')
- sys.stderr.write('\n'.join(
- '--- {:<10} = {.__doc__}'.format(k, v)
- for k, v in sorted(TRANSFORMATIONS.items())))
- sys.stderr.write('\n--- Enter new filter name(s) [{}]: '.format(' '.join(self.filters)))
- with self.console:
- new_filters = sys.stdin.readline().lower().split()
- if new_filters:
- for f in new_filters:
- if f not in TRANSFORMATIONS:
- sys.stderr.write('--- unknown filter: {}\n'.format(repr(f)))
- break
- else:
- self.filters = new_filters
- self.update_transformations()
- sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
+ self.change_filter()
elif c == '\x0c': # CTRL+L -> EOL mode
modes = list(EOL_TRANSFORMATIONS) # keys
eol = modes.index(self.eol) + 1
@@ -561,82 +528,17 @@
sys.stderr.write('--- EOL: {} ---\n'.format(self.eol.upper()))
self.update_transformations()
elif c == '\x01': # CTRL+A -> set encoding
- sys.stderr.write('\n--- Enter new encoding name [{}]: '.format(self.input_encoding))
- with self.console:
- new_encoding = sys.stdin.readline().strip()
- if new_encoding:
- try:
- codecs.lookup(new_encoding)
- except LookupError:
- sys.stderr.write('--- invalid encoding name: {}\n'.format(new_encoding))
- else:
- self.set_rx_encoding(new_encoding)
- self.set_tx_encoding(new_encoding)
- sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding))
- sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding))
+ self.change_encoding()
elif c == '\x09': # CTRL+I -> info
self.dump_port_settings()
#~ elif c == '\x01': # CTRL+A -> cycle escape mode
#~ elif c == '\x0c': # CTRL+L -> cycle linefeed mode
elif c in 'pP': # P -> change port
- with self.console:
- try:
- port = ask_for_port()
- except KeyboardInterrupt:
- port = None
- if port and port != self.serial.port:
- # reader thread needs to be shut down
- self._stop_reader()
- # save settings
- settings = self.serial.getSettingsDict()
- try:
- new_serial = serial.serial_for_url(port, do_not_open=True)
- # restore settings and open
- new_serial.applySettingsDict(settings)
- new_serial.rts = self.serial.rts
- new_serial.dtr = self.serial.dtr
- new_serial.open()
- new_serial.break_condition = self.serial.break_condition
- except Exception as e:
- sys.stderr.write('--- ERROR opening new port: {} ---\n'.format(e))
- new_serial.close()
- else:
- self.serial.close()
- self.serial = new_serial
- sys.stderr.write('--- Port changed to: {} ---\n'.format(self.serial.port))
- # and restart the reader thread
- self._start_reader()
+ self.change_port()
elif c in 'sS': # S -> suspend / open port temporarily
- # reader thread needs to be shut down
- self._stop_reader()
- self.serial.close()
- sys.stderr.write('\n--- Port closed: {} ---\n'.format(self.serial.port))
- while not self.serial.is_open:
- sys.stderr.write('--- press {exit} to exit or any other key to reconnect ---\n'.format(
- exit=key_description(self.exit_character)))
- k = self.console.getkey()
- if k == self.exit_character:
- self.stop() # exit app
- break
- try:
- self.serial.open()
- except Exception as e:
- sys.stderr.write('--- ERROR opening port: {} ---\n'.format(e))
- # and restart the reader thread
- self._start_reader()
- sys.stderr.write('--- Port opened: {} ---\n'.format(self.serial.port))
+ self.suspend_port()
elif c in 'bB': # B -> change baudrate
- sys.stderr.write('\n--- Baudrate: ')
- sys.stderr.flush()
- with self.console:
- backup = self.serial.baudrate
- try:
- self.serial.baudrate = int(sys.stdin.readline().strip())
- except ValueError as e:
- sys.stderr.write('--- ERROR setting baudrate: {} ---\n'.format(e))
- self.serial.baudrate = backup
- else:
- self.dump_port_settings()
+ self.change_baudrate()
elif c == '8': # 8 -> change to 8 bits
self.serial.bytesize = serial.EIGHTBITS
self.dump_port_settings()
@@ -676,6 +578,138 @@
else:
sys.stderr.write('--- unknown menu character {} --\n'.format(key_description(c)))
+ def upload_file(self):
+ """Ask user for filenname and send its contents"""
+ sys.stderr.write('\n--- File to upload: ')
+ sys.stderr.flush()
+ with self.console:
+ filename = sys.stdin.readline().rstrip('\r\n')
+ if filename:
+ try:
+ with open(filename, 'rb') as f:
+ sys.stderr.write('--- Sending file {} ---\n'.format(filename))
+ while True:
+ block = f.read(1024)
+ if not block:
+ break
+ self.serial.write(block)
+ # Wait for output buffer to drain.
+ self.serial.flush()
+ sys.stderr.write('.') # Progress indicator.
+ sys.stderr.write('\n--- File {} sent ---\n'.format(filename))
+ except IOError as e:
+ sys.stderr.write('--- ERROR opening file {}: {} ---\n'.format(filename, e))
+
+ def change_filter(self):
+ """change the i/o transformations"""
+ sys.stderr.write('\n--- Available Filters:\n')
+ sys.stderr.write('\n'.join(
+ '--- {:<10} = {.__doc__}'.format(k, v)
+ for k, v in sorted(TRANSFORMATIONS.items())))
+ sys.stderr.write('\n--- Enter new filter name(s) [{}]: '.format(' '.join(self.filters)))
+ with self.console:
+ new_filters = sys.stdin.readline().lower().split()
+ if new_filters:
+ for f in new_filters:
+ if f not in TRANSFORMATIONS:
+ sys.stderr.write('--- unknown filter: {}\n'.format(repr(f)))
+ break
+ else:
+ self.filters = new_filters
+ self.update_transformations()
+ sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
+
+ def change_encoding(self):
+ """change encoding on the serial port"""
+ sys.stderr.write('\n--- Enter new encoding name [{}]: '.format(self.input_encoding))
+ with self.console:
+ new_encoding = sys.stdin.readline().strip()
+ if new_encoding:
+ try:
+ codecs.lookup(new_encoding)
+ except LookupError:
+ sys.stderr.write('--- invalid encoding name: {}\n'.format(new_encoding))
+ else:
+ self.set_rx_encoding(new_encoding)
+ self.set_tx_encoding(new_encoding)
+ sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding))
+ sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding))
+
+ def change_baudrate(self):
+ """change the baudrate"""
+ sys.stderr.write('\n--- Baudrate: ')
+ sys.stderr.flush()
+ with self.console:
+ backup = self.serial.baudrate
+ try:
+ self.serial.baudrate = int(sys.stdin.readline().strip())
+ except ValueError as e:
+ sys.stderr.write('--- ERROR setting baudrate: {} ---\n'.format(e))
+ self.serial.baudrate = backup
+ else:
+ self.dump_port_settings()
+
+ def change_port(self):
+ """Have a conversation with the user to change the serial port"""
+ with self.console:
+ try:
+ port = ask_for_port()
+ except KeyboardInterrupt:
+ port = None
+ if port and port != self.serial.port:
+ # reader thread needs to be shut down
+ self._stop_reader()
+ # save settings
+ settings = self.serial.getSettingsDict()
+ try:
+ new_serial = serial.serial_for_url(port, do_not_open=True)
+ # restore settings and open
+ new_serial.applySettingsDict(settings)
+ new_serial.rts = self.serial.rts
+ new_serial.dtr = self.serial.dtr
+ new_serial.open()
+ new_serial.break_condition = self.serial.break_condition
+ except Exception as e:
+ sys.stderr.write('--- ERROR opening new port: {} ---\n'.format(e))
+ new_serial.close()
+ else:
+ self.serial.close()
+ self.serial = new_serial
+ sys.stderr.write('--- Port changed to: {} ---\n'.format(self.serial.port))
+ # and restart the reader thread
+ self._start_reader()
+
+ def suspend_port(self):
+ """\
+ open port temporarily, allow reconnect, exit and port change to get
+ out of the loop
+ """
+ # reader thread needs to be shut down
+ self._stop_reader()
+ self.serial.close()
+ sys.stderr.write('\n--- Port closed: {} ---\n'.format(self.serial.port))
+ do_change_port = False
+ while not self.serial.is_open:
+ sys.stderr.write('--- Quit: {exit} | p: port change | any other key to reconnect ---\n'.format(
+ exit=key_description(self.exit_character)))
+ k = self.console.getkey()
+ if k == self.exit_character:
+ self.stop() # exit app
+ break
+ elif k in 'pP':
+ do_change_port = True
+ break
+ try:
+ self.serial.open()
+ except Exception as e:
+ sys.stderr.write('--- ERROR opening port: {} ---\n'.format(e))
+ if do_change_port:
+ self.change_port()
+ else:
+ # and restart the reader thread
+ self._start_reader()
+ sys.stderr.write('--- Port opened: {} ---\n'.format(self.serial.port))
+
def get_help_text(self):
"""return the help text"""
# help text, starts with blank line!