blob: faa00bc8a541de37c875f6c257dbe8888fa4f03e [file] [log] [blame]
Chris Liechtib5ccc1c2015-10-06 22:05:11 +02001#! /usr/bin/env python
2# encoding: utf-8
3"""
4Example of a AT command protocol.
5
6https://en.wikipedia.org/wiki/Hayes_command_set
7http://www.itu.int/rec/T-REC-V.250-200307-I/en
8"""
9from __future__ import print_function
10
11import sys
12sys.path.insert(0, '..')
13
14import logging
15import serial
16import serial.threaded
17import threading
18
19try:
20 import queue
21except ImportError:
22 import Queue as queue
23
24
25class ATException(Exception):
26 pass
27
28
29class ATProtocol(serial.threaded.LineReader):
30
31 TERMINATOR = b'\r\n'
32
33 def __init__(self):
34 super(ATProtocol, self).__init__()
35 self.alive = True
36 self.responses = queue.Queue()
37 self.events = queue.Queue()
38 self._event_thread = threading.Thread(target=self._run_event)
39 self._event_thread.daemon = True
40 self._event_thread.name = 'at-event'
41 self._event_thread.start()
42 self.lock = threading.Lock()
43
44 def connection_made(self, transport):
45 super(ATProtocol, self).connection_made(transport)
46 self.transport = transport
47
48 def stop(self):
49 """
50 Stop the event processing thread, abort pending commands, if any.
51 """
52 self.alive = False
53 self.events.put(None)
54 self.responses.put('<exit>')
55
56 def _run_event(self):
57 """
58 Process events in a separate thread so that input thread is not
59 blocked.
60 """
61 while self.alive:
62 try:
63 self.handle_event(self.events.get())
64 except:
65 logging.exception('_run_event')
66
67 def handle_line(self, line):
68 """
69 Handle input from serial port, check for events.
70 """
71 if line.startswith('+'):
72 self.events.put(line)
73 else:
74 self.responses.put(line)
75
76 def handle_event(self, event):
77 """
78 Spontaneous message received.
79 """
80 print('event received:', event)
81
82 def command(self, command, response='OK', timeout=5):
83 """
84 Set an AT command and wait for the response.
85 """
86 with self.lock: # ensure that just one thread is sending commands at once
87 self.transport.write(b'%s\r\n' % (command.encode(self.ENCODING, self.UNICODE_HANDLING),))
88 lines = []
89 while True:
90 try:
91 line = self.responses.get(timeout=timeout)
92 #~ print("%s -> %r" % (command, line))
93 if line == response:
94 return lines
95 else:
96 lines.append(line)
97 except queue.Empty:
98 raise ATException('AT command timeout (%r)' % (command,))
99
100
101# test
102if __name__ == '__main__':
103 import sys
104 import time
105
106 class PAN1322(ATProtocol):
107 """
108 Example communication with PAN1322 BT module.
109
110 Some commands do not respond with OK but with a '+...' line. This is
111 implemented via command_with_event_response and handle_event, because
112 '+...' lines are also used for real events.
113 """
114
115 def __init__(self):
116 super(PAN1322, self).__init__()
117 self.event_responses = queue.Queue()
118 self._awaiting_response_for = None
119
120 def connection_made(self, transport):
121 super(PAN1322, self).connection_made(transport)
122 # our adapter enables the module with RTS=low
123 self.transport.serial.rts = False
124 time.sleep(0.3)
125 self.transport.serial.reset_input_buffer()
126
127 def handle_event(self, event):
128 """Handle events and command responses starting with '+...'"""
129 if event.startswith('+RRBDRES') and self._awaiting_response_for.startswith('AT+JRBD'):
130 rev = event[9:9+12]
131 mac = ':'.join('%02X' % ord(x) for x in rev.decode('hex')[::-1])
132 self.event_responses.put(mac)
133 else:
134 log.warning('unhandled event: %r' % event)
135
136 def command_with_event_response(self, command):
137 """Send a command that responds with '+...' line"""
138 with self.lock: # ensure that just one thread is sending commands at once
139 self._awaiting_response_for = command
140 self.transport.write(b'%s\r\n' % (command.encode(self.ENCODING, self.UNICODE_HANDLING),))
141 response = self.event_responses.get()
142 self._awaiting_response_for = None
143 return response
144
145 # - - - example commands
146
147 def reset(self):
148 self.command("AT+JRES", response='ROK') # SW-Reset BT module
149
150 def get_mac_address(self):
151 # requests hardware / calibrationinfo as event
152 return self.command_with_event_response("AT+JRBD")
153
154
155 ser = serial.serial_for_url('spy://COM1', baudrate=115200, timeout=1)
156 #~ ser = serial.Serial('COM1', baudrate=115200, timeout=1)
157 with serial.threaded.SerialPortWorker(ser, PAN1322) as bt_module:
158 bt_module.reset()
159 print("reset OK")
160 print("MAC address is", bt_module.get_mac_address())