blob: d9e881bfa3529aa08d5e9581e6820665a979a83f [file] [log] [blame]
Chris Liechtib5ccc1c2015-10-06 22:05:11 +02001#! /usr/bin/env python
2# encoding: utf-8
3"""
4Example of a AT command protocol.
5
6https://en.wikipedia.org/wiki/Hayes_command_set
7http://www.itu.int/rec/T-REC-V.250-200307-I/en
8"""
9from __future__ import print_function
10
11import sys
12sys.path.insert(0, '..')
13
14import logging
15import serial
16import serial.threaded
17import threading
18
19try:
20 import queue
21except ImportError:
22 import Queue as queue
23
24
25class ATException(Exception):
26 pass
27
28
29class ATProtocol(serial.threaded.LineReader):
30
31 TERMINATOR = b'\r\n'
32
33 def __init__(self):
34 super(ATProtocol, self).__init__()
35 self.alive = True
36 self.responses = queue.Queue()
37 self.events = queue.Queue()
38 self._event_thread = threading.Thread(target=self._run_event)
39 self._event_thread.daemon = True
40 self._event_thread.name = 'at-event'
41 self._event_thread.start()
42 self.lock = threading.Lock()
43
Chris Liechtib5ccc1c2015-10-06 22:05:11 +020044 def stop(self):
45 """
46 Stop the event processing thread, abort pending commands, if any.
47 """
48 self.alive = False
49 self.events.put(None)
50 self.responses.put('<exit>')
51
52 def _run_event(self):
53 """
54 Process events in a separate thread so that input thread is not
55 blocked.
56 """
57 while self.alive:
58 try:
59 self.handle_event(self.events.get())
60 except:
61 logging.exception('_run_event')
62
63 def handle_line(self, line):
64 """
65 Handle input from serial port, check for events.
66 """
67 if line.startswith('+'):
68 self.events.put(line)
69 else:
70 self.responses.put(line)
71
72 def handle_event(self, event):
73 """
74 Spontaneous message received.
75 """
76 print('event received:', event)
77
78 def command(self, command, response='OK', timeout=5):
79 """
80 Set an AT command and wait for the response.
81 """
82 with self.lock: # ensure that just one thread is sending commands at once
Chris Liechti24becf32015-10-07 02:35:54 +020083 self.write_line(command)
Chris Liechtib5ccc1c2015-10-06 22:05:11 +020084 lines = []
85 while True:
86 try:
87 line = self.responses.get(timeout=timeout)
88 #~ print("%s -> %r" % (command, line))
89 if line == response:
90 return lines
91 else:
92 lines.append(line)
93 except queue.Empty:
94 raise ATException('AT command timeout (%r)' % (command,))
95
96
97# test
98if __name__ == '__main__':
Chris Liechtib5ccc1c2015-10-06 22:05:11 +020099 import time
100
101 class PAN1322(ATProtocol):
102 """
103 Example communication with PAN1322 BT module.
Chris Liechti24becf32015-10-07 02:35:54 +0200104
Chris Liechtib5ccc1c2015-10-06 22:05:11 +0200105 Some commands do not respond with OK but with a '+...' line. This is
106 implemented via command_with_event_response and handle_event, because
107 '+...' lines are also used for real events.
108 """
109
110 def __init__(self):
111 super(PAN1322, self).__init__()
112 self.event_responses = queue.Queue()
113 self._awaiting_response_for = None
114
115 def connection_made(self, transport):
116 super(PAN1322, self).connection_made(transport)
117 # our adapter enables the module with RTS=low
118 self.transport.serial.rts = False
119 time.sleep(0.3)
120 self.transport.serial.reset_input_buffer()
121
122 def handle_event(self, event):
123 """Handle events and command responses starting with '+...'"""
124 if event.startswith('+RRBDRES') and self._awaiting_response_for.startswith('AT+JRBD'):
125 rev = event[9:9+12]
126 mac = ':'.join('%02X' % ord(x) for x in rev.decode('hex')[::-1])
127 self.event_responses.put(mac)
128 else:
Chris Liechti24becf32015-10-07 02:35:54 +0200129 logging.warning('unhandled event: %r' % event)
Chris Liechtib5ccc1c2015-10-06 22:05:11 +0200130
131 def command_with_event_response(self, command):
132 """Send a command that responds with '+...' line"""
133 with self.lock: # ensure that just one thread is sending commands at once
134 self._awaiting_response_for = command
135 self.transport.write(b'%s\r\n' % (command.encode(self.ENCODING, self.UNICODE_HANDLING),))
136 response = self.event_responses.get()
137 self._awaiting_response_for = None
138 return response
139
140 # - - - example commands
141
142 def reset(self):
143 self.command("AT+JRES", response='ROK') # SW-Reset BT module
144
145 def get_mac_address(self):
146 # requests hardware / calibrationinfo as event
147 return self.command_with_event_response("AT+JRBD")
148
Chris Liechtib5ccc1c2015-10-06 22:05:11 +0200149 ser = serial.serial_for_url('spy://COM1', baudrate=115200, timeout=1)
150 #~ ser = serial.Serial('COM1', baudrate=115200, timeout=1)
151 with serial.threaded.SerialPortWorker(ser, PAN1322) as bt_module:
152 bt_module.reset()
153 print("reset OK")
154 print("MAC address is", bt_module.get_mac_address())