Chris Liechti | c0c660a | 2015-08-25 00:55:51 +0200 | [diff] [blame] | 1 | """\ |
| 2 | Python 'hex' Codec - 2-digit hex with spaces content transfer encoding. |
| 3 | """ |
| 4 | |
| 5 | import codecs |
| 6 | import serial |
| 7 | |
| 8 | HEXDIGITS = '0123456789ABCDEF' |
| 9 | |
| 10 | ### Codec APIs |
| 11 | |
Chris Liechti | 033f17c | 2015-08-30 21:28:04 +0200 | [diff] [blame] | 12 | |
Chris Liechti | c0c660a | 2015-08-25 00:55:51 +0200 | [diff] [blame] | 13 | def hex_encode(input, errors='strict'): |
| 14 | return (serial.to_bytes([int(h, 16) for h in input.split()]), len(input)) |
| 15 | |
Chris Liechti | 033f17c | 2015-08-30 21:28:04 +0200 | [diff] [blame] | 16 | |
Chris Liechti | c0c660a | 2015-08-25 00:55:51 +0200 | [diff] [blame] | 17 | def hex_decode(input, errors='strict'): |
| 18 | return (''.join('{:02X} '.format(b) for b in input), len(input)) |
| 19 | |
Chris Liechti | 033f17c | 2015-08-30 21:28:04 +0200 | [diff] [blame] | 20 | |
Chris Liechti | c0c660a | 2015-08-25 00:55:51 +0200 | [diff] [blame] | 21 | class Codec(codecs.Codec): |
| 22 | def encode(self, input, errors='strict'): |
| 23 | return serial.to_bytes([int(h, 16) for h in input.split()]) |
Chris Liechti | 033f17c | 2015-08-30 21:28:04 +0200 | [diff] [blame] | 24 | |
Chris Liechti | c0c660a | 2015-08-25 00:55:51 +0200 | [diff] [blame] | 25 | def decode(self, input, errors='strict'): |
| 26 | return ''.join('{:02X} '.format(b) for b in input) |
| 27 | |
Chris Liechti | 033f17c | 2015-08-30 21:28:04 +0200 | [diff] [blame] | 28 | |
Chris Liechti | c0c660a | 2015-08-25 00:55:51 +0200 | [diff] [blame] | 29 | class IncrementalEncoder(codecs.IncrementalEncoder): |
| 30 | |
| 31 | def __init__(self, errors='strict'): |
| 32 | self.errors = errors |
| 33 | self.state = 0 |
| 34 | |
| 35 | def reset(self): |
| 36 | self.state = 0 |
| 37 | |
| 38 | def getstate(self): |
| 39 | return self.state |
| 40 | |
| 41 | def setstate(self, state): |
| 42 | self.state = state |
| 43 | |
| 44 | def encode(self, input, final=False): |
| 45 | state = self.state |
| 46 | encoded = [] |
| 47 | for c in input.upper(): |
| 48 | if c in HEXDIGITS: |
| 49 | z = HEXDIGITS.index(c) |
| 50 | if state: |
| 51 | encoded.append(z + (state & 0xf0)) |
| 52 | state = 0 |
| 53 | else: |
| 54 | state = 0x100 + (z << 4) |
| 55 | elif c == ' ': # allow spaces to separate values |
| 56 | if state and self.errors == 'strict': |
| 57 | raise UnicodeError('odd number of hex digits') |
| 58 | state = 0 |
| 59 | else: |
| 60 | if self.errors == 'strict': |
| 61 | raise UnicodeError('non-hex digit found: %r' % c) |
| 62 | self.state = state |
| 63 | return serial.to_bytes(encoded) |
| 64 | |
Chris Liechti | 033f17c | 2015-08-30 21:28:04 +0200 | [diff] [blame] | 65 | |
Chris Liechti | c0c660a | 2015-08-25 00:55:51 +0200 | [diff] [blame] | 66 | class IncrementalDecoder(codecs.IncrementalDecoder): |
| 67 | def decode(self, input, final=False): |
| 68 | return ''.join('{:02X} '.format(b) for b in input) |
| 69 | |
Chris Liechti | 033f17c | 2015-08-30 21:28:04 +0200 | [diff] [blame] | 70 | |
Chris Liechti | c0c660a | 2015-08-25 00:55:51 +0200 | [diff] [blame] | 71 | class StreamWriter(Codec, codecs.StreamWriter): |
| 72 | pass |
| 73 | |
Chris Liechti | 033f17c | 2015-08-30 21:28:04 +0200 | [diff] [blame] | 74 | |
Chris Liechti | c0c660a | 2015-08-25 00:55:51 +0200 | [diff] [blame] | 75 | class StreamReader(Codec, codecs.StreamReader): |
| 76 | pass |
| 77 | |
Chris Liechti | c0c660a | 2015-08-25 00:55:51 +0200 | [diff] [blame] | 78 | |
Chris Liechti | 033f17c | 2015-08-30 21:28:04 +0200 | [diff] [blame] | 79 | ### encodings module API |
Chris Liechti | c0c660a | 2015-08-25 00:55:51 +0200 | [diff] [blame] | 80 | def getregentry(): |
| 81 | return codecs.CodecInfo( |
| 82 | name='hexlify', |
| 83 | encode=hex_encode, |
| 84 | decode=hex_decode, |
| 85 | incrementalencoder=IncrementalEncoder, |
| 86 | incrementaldecoder=IncrementalDecoder, |
| 87 | streamwriter=StreamWriter, |
| 88 | streamreader=StreamReader, |
| 89 | _is_text_encoding=True, |
| 90 | ) |