blob: b3238db4e38a4080c34a2109bc5148fbb888655b [file] [log] [blame]
Chris Liechtid07a9e72015-08-22 02:58:47 +02001#!/usr/bin/env python3
2#
Chris Liechti3e02f702015-12-16 23:06:04 +01003# Experimental implementation of asyncio support.
Chris Liechtid07a9e72015-08-22 02:58:47 +02004#
Chris Liechti3e02f702015-12-16 23:06:04 +01005# This file is part of pySerial. https://github.com/pyserial/pyserial
Chris Liechtid07a9e72015-08-22 02:58:47 +02006# (C) 2015 Chris Liechti <cliechti@gmx.net>
7#
8# SPDX-License-Identifier: BSD-3-Clause
9"""\
10Support asyncio with serial ports. EXPERIMENTAL
11
12Posix platforms only, Python 3.4+ only.
13
14Windows event loops can not wait for serial ports with the current
15implementation. It should be possible to get that working though.
16"""
17import asyncio
18import serial
Chris Liechti033f17c2015-08-30 21:28:04 +020019
Chris Liechtid07a9e72015-08-22 02:58:47 +020020
21class SerialTransport(asyncio.Transport):
Robert Smallshire4f429c92016-03-30 18:48:57 +020022 """An asyncio transport model of a serial communication channel.
23
24 A transport class is an abstraction of a communication channel.
25 This allows protocol implementations to be developed against the
26 transport abstraction without needing to know the details of the
27 underlying channel, such as whether it is a pipe, a socket, or
28 indeed a serial port.
29
30
31 You generally won’t instantiate a transport yourself; instead, you
32 will call `create_serial_connection` which will create the
33 transport and try to initiate the underlying communication channel,
34 calling you back when it succeeds.
35 """
36
Chris Liechtid07a9e72015-08-22 02:58:47 +020037 def __init__(self, loop, protocol, serial_instance):
Robert Smallshire29187262016-03-22 22:15:39 +010038 super().__init__()
Chris Liechtid07a9e72015-08-22 02:58:47 +020039 self._loop = loop
40 self._protocol = protocol
Robert Smallshire4f429c92016-03-30 18:48:57 +020041 self._serial = serial_instance
Chris Liechtid07a9e72015-08-22 02:58:47 +020042 self._closing = False
Robert Smallshire4f429c92016-03-30 18:48:57 +020043 self._protocol_paused = False
44 self._max_read_size = 1024
45 self._write_buffer = []
46 self._set_write_buffer_limits()
47 self._has_reader = False
48 self._has_writer = False
49
Chris Liechtid07a9e72015-08-22 02:58:47 +020050 # XXX how to support url handlers too
Robert Smallshire4f429c92016-03-30 18:48:57 +020051
52 # Asynchronous I/O requires non-blocking devices
53 self._serial.timeout = 0
54 self._serial.write_timeout = 0
55 self._serial.nonblocking()
56
57 # These two callbacks will be enqueued in a FIFO queue by asyncio
Chris Liechtid07a9e72015-08-22 02:58:47 +020058 loop.call_soon(protocol.connection_made, self)
Robert Smallshire4f429c92016-03-30 18:48:57 +020059 loop.call_soon(self._ensure_reader)
60
61 @property
62 def serial(self):
63 """The underlying Serial instance."""
64 return self._serial
Chris Liechtid07a9e72015-08-22 02:58:47 +020065
66 def __repr__(self):
67 return '{self.__class__.__name__}({self._loop}, {self._protocol}, {self.serial})'.format(self=self)
68
Robert Smallshire48141682016-03-22 21:07:36 +010069 def is_closing(self):
70 """Return True if the transport is closing or closed."""
71 return self._closing
72
Robert Smallshire4f429c92016-03-30 18:48:57 +020073 def close(self):
74 """Close the transport gracefully.
75
76 Any buffered data will be written asynchronously. No more data
77 will be received and further writes will be silently ignored.
78 After all buffered data is flushed, the protocol's
79 connection_lost() method will be called with None as its
80 argument.
81 """
82 if not self._closing:
83 self._close(None)
Chris Liechtid07a9e72015-08-22 02:58:47 +020084
85 def _read_ready(self):
Chris Liechti88e45ee2016-01-19 22:18:34 +010086 try:
Robert Smallshire4f429c92016-03-30 18:48:57 +020087 data = self._serial.read(self._max_read_size)
Chris Liechti88e45ee2016-01-19 22:18:34 +010088 except serial.SerialException as e:
Robert Smallshire4f429c92016-03-30 18:48:57 +020089 self._close(exc=e)
Chris Liechti88e45ee2016-01-19 22:18:34 +010090 else:
91 if data:
92 self._protocol.data_received(data)
Chris Liechtid07a9e72015-08-22 02:58:47 +020093
94 def write(self, data):
Robert Smallshire4f429c92016-03-30 18:48:57 +020095 """Write some data to the transport.
96
97 This method does not block; it buffers the data and arranges
98 for it to be sent out asynchronously. Writes made after the
99 transport has been closed will be ignored."""
100 if self._closing:
101 return
102
103 if self.get_write_buffer_size() == 0:
104 # Attempt to send it right away first
105 try:
106 n = self._serial.write(data)
107 except serial.SerialException as exc:
108 self._fatal_error(exc, 'Fatal write error on serial transport')
109 return
110 if n == len(data):
111 return # Whole request satisfied
112 assert n > 0
113 data = data[n:]
114 self._ensure_writer()
115
116 self._write_buffer.append(data)
117 self._maybe_pause_protocol()
Chris Liechtid07a9e72015-08-22 02:58:47 +0200118
119 def can_write_eof(self):
Robert Smallshire4f429c92016-03-30 18:48:57 +0200120 """Serial ports do not support the concept of end-of-file.
121
122 Always returns False.
123 """
Chris Liechtid07a9e72015-08-22 02:58:47 +0200124 return False
125
126 def pause_reading(self):
Robert Smallshire4f429c92016-03-30 18:48:57 +0200127 """Pause the receiving end of the transport.
128
129 No data will be passed to the protocol’s data_received() method
130 until resume_reading() is called.
131 """
132 self._remove_reader()
Chris Liechtid07a9e72015-08-22 02:58:47 +0200133
134 def resume_reading(self):
Robert Smallshire4f429c92016-03-30 18:48:57 +0200135 """Resume the receiving end of the transport.
Chris Liechtid07a9e72015-08-22 02:58:47 +0200136
Robert Smallshire4f429c92016-03-30 18:48:57 +0200137 Incoming data will be passed to the protocol's data_received()
138 method until pause_reading() is called.
139 """
140 self._ensure_reader()
141
142 def set_write_buffer_limits(self, high=None, low=None):
143 """Set the high- and low-water limits for write flow control.
144
145 These two values control when call the protocol’s
146 pause_writing()and resume_writing() methods are called. If
147 specified, the low-water limit must be less than or equal to
148 the high-water limit. Neither high nor low can be negative.
149 """
150 self._set_write_buffer_limits(high=high, low=low)
151 self._maybe_pause_protocol()
152
153 def get_write_buffer_size(self):
154 """The number of bytes in the write buffer.
155
156 This buffer is unbounded, so the result may be larger than the
157 the high water mark.
158 """
159 return sum(map(len, self._write_buffer))
160
161 def write_eof(self):
162 raise NotImplementedError("Serial connections do not support end-of-file")
163
164 def abort(self):
165 """Close the transport immediately.
166
167 Pending operations will not be given opportunity to complete,
168 and buffered data will be lost. No more data will be received
169 and further writes will be ignored. The protocol's
170 connection_lost() method will eventually be called.
171 """
172 self._abort(None)
173
174 def _maybe_pause_protocol(self):
175 """To be called whenever the write-buffer size increases.
176
177 Tests the current write-buffer size against the high water
178 mark configured for this transport. If the high water mark is
179 exceeded, the protocol is instructed to pause_writing().
180 """
181 if self.get_write_buffer_size() <= self._high_water:
182 return
183 if not self._protocol_paused:
184 self._protocol_paused = True
185 try:
186 self._protocol.pause_writing()
187 except Exception as exc:
188 self._loop.call_exception_handler({
189 'message': 'protocol.pause_writing() failed',
190 'exception': exc,
191 'transport': self,
192 'protocol': self._protocol,
193 })
194
195 def _maybe_resume_protocol(self):
196 """To be called whenever the write-buffer size decreases.
197
198 Tests the current write-buffer size against the low water
199 mark configured for this transport. If the write-buffer
200 size is below the low water mark, the protocol is
201 instructed that is can resume_writing().
202 """
203 if (self._protocol_paused and
204 self.get_write_buffer_size() <= self._low_water):
205 self._protocol_paused = False
206 try:
207 self._protocol.resume_writing()
208 except Exception as exc:
209 self._loop.call_exception_handler({
210 'message': 'protocol.resume_writing() failed',
211 'exception': exc,
212 'transport': self,
213 'protocol': self._protocol,
214 })
215
216 def _write_ready(self):
217 """Asynchronously write buffered data.
218
219 This method is called back asynchronously as a writer
220 registered with the asyncio event-loop against the
221 underlying file descriptor for the serial port.
222
223 Should the write-buffer become empty if this method
224 is invoked while the transport is closing, the protocol's
225 connection_lost() method will be called with None as its
226 argument.
227 """
228 data = b''.join(self._write_buffer)
229 num_bytes = len(data)
230 assert data, 'Write buffer should not be empty'
231
232 self._write_buffer.clear()
233
234 try:
235 n = self._serial.write(data)
236 except (BlockingIOError, InterruptedError):
237 self._write_buffer.append(data)
238 except serial.SerialException as exc:
239 self._fatal_error(exc, 'Fatal write error on serial transport')
240 else:
241 if n == len(data):
242 assert self._flushed()
243 self._remove_writer()
244 self._maybe_resume_protocol() # May cause further writes
245 # _write_ready may have been invoked by the event loop
246 # after the transport was closed, as part of the ongoing
247 # process of flushing buffered data. If the buffer
248 # is now empty, we can close the connection
249 if self._closing and self._flushed():
250 self._close()
251 return
252
253 assert n > 0
254 data = data[n:]
255 self._write_buffer.append(data) # Try again later
256 self._maybe_resume_protocol()
257 assert self._has_writer
258
259 def _ensure_reader(self):
260 if (not self._has_reader) and (not self._closing):
261 self._loop.add_reader(self._serial.fd, self._read_ready)
262 self._has_reader = True
263
264 def _remove_reader(self):
265 if self._has_reader:
266 self._loop.remove_reader(self._serial.fd)
267 self._has_reader = False
268
269 def _ensure_writer(self):
270 if (not self._has_writer) and (not self._closing):
271 self._loop.add_writer(self._serial.fd, self._write_ready)
272 self._has_writer = True
273
274 def _remove_writer(self):
275 if self._has_writer:
276 self._loop.remove_writer(self._serial.fd)
277 self._has_writer = False
278
279 def _set_write_buffer_limits(self, high=None, low=None):
280 """Ensure consistent write-buffer limits."""
281 if high is None:
282 high = 64*1024 if low is None else 4*low
283 if low is None:
284 low = high // 4
285 if not high >= low >= 0:
286 raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
287 (high, low))
288 self._high_water = high
289 self._low_water = low
290
291 def _fatal_error(self, exc, message='Fatal error on serial transport'):
292 """Report a fatal error to the event-loop and abort the transport."""
293 self._loop.call_exception_handler({
294 'message': message,
295 'exception': exc,
296 'transport': self,
297 'protocol': self._protocol,
298 })
299 self._abort(exc)
300
301 def _flushed(self):
302 """True if the write buffer is empty, otherwise False."""
303 return self.get_write_buffer_size() == 0
304
305 def _close(self, exc=None):
306 """Close the transport gracefully.
307
308 If the write buffer is already empty, writing will be
309 stopped immediately and a call to the protocol's
310 connection_lost() method scheduled.
311
312 If the write buffer is not already empty, the
313 asynchronous writing will continue, and the _write_ready
314 method will call this _close method again when the
315 buffer has been flushed completely.
316 """
317 self._closing = True
318 self._remove_reader()
319 if self._flushed():
320 self._remove_writer()
321 self._loop.call_soon(self._call_connection_lost, exc)
322
323 def _abort(self, exc):
324 """Close the transport immediately.
325
326 Pending operations will not be given opportunity to complete,
327 and buffered data will be lost. No more data will be received
328 and further writes will be ignored. The protocol's
329 connection_lost() method will eventually be called with the
330 passed exception.
331 """
332 self._closing = True
333 self._remove_reader()
334 self._remove_writer() # Pending buffered data will not be written
335 self._loop.call_soon(self._call_connection_lost, exc)
336
337 def _call_connection_lost(self, exc):
338 """Close the connection.
339
340 Informs the protocol through connection_lost() and clears
341 pending buffers and closes the serial connection.
342 """
343 assert self._closing
344 assert not self._has_writer
345 assert not self._has_reader
346 self._serial.flush()
347 try:
348 self._protocol.connection_lost(exc)
349 finally:
350 self._write_buffer.clear()
351 self._serial.close()
352 self._serial = None
353 self._protocol = None
354 self._loop = None
Chris Liechtid07a9e72015-08-22 02:58:47 +0200355
Chris Liechti033f17c2015-08-30 21:28:04 +0200356
Chris Liechtid07a9e72015-08-22 02:58:47 +0200357@asyncio.coroutine
358def create_serial_connection(loop, protocol_factory, *args, **kwargs):
Chris Liechti35f927d2016-05-23 22:35:06 +0200359 ser = serial.serial_for_url(*args, **kwargs)
Chris Liechtid07a9e72015-08-22 02:58:47 +0200360 protocol = protocol_factory()
361 transport = SerialTransport(loop, protocol, ser)
362 return (transport, protocol)
363
Robert Smallshire981a3212016-03-22 21:57:02 +0100364
365@asyncio.coroutine
366def open_serial_connection(*,
367 loop=None,
368 limit=asyncio.streams._DEFAULT_LIMIT,
369 **kwargs):
370 """A wrapper for create_serial_connection() returning a (reader,
371 writer) pair.
372
373 The reader returned is a StreamReader instance; the writer is a
374 StreamWriter instance.
375
376 The arguments are all the usual arguments to Serial(). Additional
377 optional keyword arguments are loop (to set the event loop instance
378 to use) and limit (to set the buffer limit passed to the
379 StreamReader.
380
381 This function is a coroutine.
382 """
383 if loop is None:
384 loop = asyncio.get_event_loop()
385 reader = asyncio.StreamReader(limit=limit, loop=loop)
386 protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
387 transport, _ = yield from create_serial_connection(
388 loop=loop,
389 protocol_factory=lambda: protocol,
390 **kwargs)
391 writer = asyncio.StreamWriter(transport, protocol, reader, loop)
392 return reader, writer
393
394
Chris Liechtid07a9e72015-08-22 02:58:47 +0200395# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
396# test
397if __name__ == '__main__':
398 class Output(asyncio.Protocol):
399 def connection_made(self, transport):
400 self.transport = transport
401 print('port opened', transport)
Chris Liechti0cd7d072015-09-04 23:04:53 +0200402 transport.serial.rts = False
Chris Liechtid07a9e72015-08-22 02:58:47 +0200403 transport.write(b'hello world\n')
404
405 def data_received(self, data):
406 print('data received', repr(data))
Chris Liechti35f927d2016-05-23 22:35:06 +0200407 if b'\n' in data:
408 self.transport.close()
Chris Liechtid07a9e72015-08-22 02:58:47 +0200409
410 def connection_lost(self, exc):
411 print('port closed')
412 asyncio.get_event_loop().stop()
413
Robert Smallshire4f429c92016-03-30 18:48:57 +0200414 def pause_writing(self):
415 print('pause writing')
416 print(self.transport.get_write_buffer_size())
417
418 def resume_writing(self):
419 print(self.transport.get_write_buffer_size())
420 print('resume writing')
421
Chris Liechtid07a9e72015-08-22 02:58:47 +0200422 loop = asyncio.get_event_loop()
423 coro = create_serial_connection(loop, Output, '/dev/ttyUSB0', baudrate=115200)
424 loop.run_until_complete(coro)
425 loop.run_forever()
426 loop.close()