blob: 85a1bb855d083fb2d5a3c29ca38e529023519aee [file] [log] [blame]
Chris Liechtid07a9e72015-08-22 02:58:47 +02001#!/usr/bin/env python3
2#
Chris Liechti3e02f702015-12-16 23:06:04 +01003# Experimental implementation of asyncio support.
Chris Liechtid07a9e72015-08-22 02:58:47 +02004#
Chris Liechti3e02f702015-12-16 23:06:04 +01005# This file is part of pySerial. https://github.com/pyserial/pyserial
Chris Liechtid07a9e72015-08-22 02:58:47 +02006# (C) 2015 Chris Liechti <cliechti@gmx.net>
7#
8# SPDX-License-Identifier: BSD-3-Clause
9"""\
10Support asyncio with serial ports. EXPERIMENTAL
11
12Posix platforms only, Python 3.4+ only.
13
14Windows event loops can not wait for serial ports with the current
15implementation. It should be possible to get that working though.
16"""
17import asyncio
18import serial
Chris Liechti033f17c2015-08-30 21:28:04 +020019
Chris Liechtid07a9e72015-08-22 02:58:47 +020020
21class SerialTransport(asyncio.Transport):
Robert Smallshire4f429c92016-03-30 18:48:57 +020022 """An asyncio transport model of a serial communication channel.
23
24 A transport class is an abstraction of a communication channel.
25 This allows protocol implementations to be developed against the
26 transport abstraction without needing to know the details of the
27 underlying channel, such as whether it is a pipe, a socket, or
28 indeed a serial port.
29
30
31 You generally won’t instantiate a transport yourself; instead, you
32 will call `create_serial_connection` which will create the
33 transport and try to initiate the underlying communication channel,
34 calling you back when it succeeds.
35 """
36
Chris Liechtid07a9e72015-08-22 02:58:47 +020037 def __init__(self, loop, protocol, serial_instance):
Robert Smallshire29187262016-03-22 22:15:39 +010038 super().__init__()
Chris Liechtid07a9e72015-08-22 02:58:47 +020039 self._loop = loop
40 self._protocol = protocol
Robert Smallshire4f429c92016-03-30 18:48:57 +020041 self._serial = serial_instance
Chris Liechtid07a9e72015-08-22 02:58:47 +020042 self._closing = False
Robert Smallshire4f429c92016-03-30 18:48:57 +020043 self._protocol_paused = False
44 self._max_read_size = 1024
45 self._write_buffer = []
46 self._set_write_buffer_limits()
47 self._has_reader = False
48 self._has_writer = False
49
Chris Liechtid07a9e72015-08-22 02:58:47 +020050 # XXX how to support url handlers too
Robert Smallshire4f429c92016-03-30 18:48:57 +020051
52 # Asynchronous I/O requires non-blocking devices
53 self._serial.timeout = 0
54 self._serial.write_timeout = 0
Robert Smallshire4f429c92016-03-30 18:48:57 +020055
56 # These two callbacks will be enqueued in a FIFO queue by asyncio
Chris Liechtid07a9e72015-08-22 02:58:47 +020057 loop.call_soon(protocol.connection_made, self)
Robert Smallshire4f429c92016-03-30 18:48:57 +020058 loop.call_soon(self._ensure_reader)
59
60 @property
61 def serial(self):
62 """The underlying Serial instance."""
63 return self._serial
Chris Liechtid07a9e72015-08-22 02:58:47 +020064
65 def __repr__(self):
66 return '{self.__class__.__name__}({self._loop}, {self._protocol}, {self.serial})'.format(self=self)
67
Robert Smallshire48141682016-03-22 21:07:36 +010068 def is_closing(self):
69 """Return True if the transport is closing or closed."""
70 return self._closing
71
Robert Smallshire4f429c92016-03-30 18:48:57 +020072 def close(self):
73 """Close the transport gracefully.
74
75 Any buffered data will be written asynchronously. No more data
76 will be received and further writes will be silently ignored.
77 After all buffered data is flushed, the protocol's
78 connection_lost() method will be called with None as its
79 argument.
80 """
81 if not self._closing:
82 self._close(None)
Chris Liechtid07a9e72015-08-22 02:58:47 +020083
84 def _read_ready(self):
Chris Liechti88e45ee2016-01-19 22:18:34 +010085 try:
Robert Smallshire4f429c92016-03-30 18:48:57 +020086 data = self._serial.read(self._max_read_size)
Chris Liechti88e45ee2016-01-19 22:18:34 +010087 except serial.SerialException as e:
Robert Smallshire4f429c92016-03-30 18:48:57 +020088 self._close(exc=e)
Chris Liechti88e45ee2016-01-19 22:18:34 +010089 else:
90 if data:
91 self._protocol.data_received(data)
Chris Liechtid07a9e72015-08-22 02:58:47 +020092
93 def write(self, data):
Robert Smallshire4f429c92016-03-30 18:48:57 +020094 """Write some data to the transport.
95
96 This method does not block; it buffers the data and arranges
97 for it to be sent out asynchronously. Writes made after the
98 transport has been closed will be ignored."""
99 if self._closing:
100 return
101
102 if self.get_write_buffer_size() == 0:
103 # Attempt to send it right away first
104 try:
105 n = self._serial.write(data)
106 except serial.SerialException as exc:
107 self._fatal_error(exc, 'Fatal write error on serial transport')
108 return
109 if n == len(data):
110 return # Whole request satisfied
111 assert n > 0
112 data = data[n:]
113 self._ensure_writer()
114
115 self._write_buffer.append(data)
116 self._maybe_pause_protocol()
Chris Liechtid07a9e72015-08-22 02:58:47 +0200117
118 def can_write_eof(self):
Robert Smallshire4f429c92016-03-30 18:48:57 +0200119 """Serial ports do not support the concept of end-of-file.
120
121 Always returns False.
122 """
Chris Liechtid07a9e72015-08-22 02:58:47 +0200123 return False
124
125 def pause_reading(self):
Robert Smallshire4f429c92016-03-30 18:48:57 +0200126 """Pause the receiving end of the transport.
127
128 No data will be passed to the protocol’s data_received() method
129 until resume_reading() is called.
130 """
131 self._remove_reader()
Chris Liechtid07a9e72015-08-22 02:58:47 +0200132
133 def resume_reading(self):
Robert Smallshire4f429c92016-03-30 18:48:57 +0200134 """Resume the receiving end of the transport.
Chris Liechtid07a9e72015-08-22 02:58:47 +0200135
Robert Smallshire4f429c92016-03-30 18:48:57 +0200136 Incoming data will be passed to the protocol's data_received()
137 method until pause_reading() is called.
138 """
139 self._ensure_reader()
140
141 def set_write_buffer_limits(self, high=None, low=None):
142 """Set the high- and low-water limits for write flow control.
143
144 These two values control when call the protocol’s
145 pause_writing()and resume_writing() methods are called. If
146 specified, the low-water limit must be less than or equal to
147 the high-water limit. Neither high nor low can be negative.
148 """
149 self._set_write_buffer_limits(high=high, low=low)
150 self._maybe_pause_protocol()
151
152 def get_write_buffer_size(self):
153 """The number of bytes in the write buffer.
154
155 This buffer is unbounded, so the result may be larger than the
156 the high water mark.
157 """
158 return sum(map(len, self._write_buffer))
159
160 def write_eof(self):
161 raise NotImplementedError("Serial connections do not support end-of-file")
162
163 def abort(self):
164 """Close the transport immediately.
165
166 Pending operations will not be given opportunity to complete,
167 and buffered data will be lost. No more data will be received
168 and further writes will be ignored. The protocol's
169 connection_lost() method will eventually be called.
170 """
171 self._abort(None)
172
173 def _maybe_pause_protocol(self):
174 """To be called whenever the write-buffer size increases.
175
176 Tests the current write-buffer size against the high water
177 mark configured for this transport. If the high water mark is
178 exceeded, the protocol is instructed to pause_writing().
179 """
180 if self.get_write_buffer_size() <= self._high_water:
181 return
182 if not self._protocol_paused:
183 self._protocol_paused = True
184 try:
185 self._protocol.pause_writing()
186 except Exception as exc:
187 self._loop.call_exception_handler({
188 'message': 'protocol.pause_writing() failed',
189 'exception': exc,
190 'transport': self,
191 'protocol': self._protocol,
192 })
193
194 def _maybe_resume_protocol(self):
195 """To be called whenever the write-buffer size decreases.
196
197 Tests the current write-buffer size against the low water
198 mark configured for this transport. If the write-buffer
199 size is below the low water mark, the protocol is
200 instructed that is can resume_writing().
201 """
202 if (self._protocol_paused and
203 self.get_write_buffer_size() <= self._low_water):
204 self._protocol_paused = False
205 try:
206 self._protocol.resume_writing()
207 except Exception as exc:
208 self._loop.call_exception_handler({
209 'message': 'protocol.resume_writing() failed',
210 'exception': exc,
211 'transport': self,
212 'protocol': self._protocol,
213 })
214
215 def _write_ready(self):
216 """Asynchronously write buffered data.
217
218 This method is called back asynchronously as a writer
219 registered with the asyncio event-loop against the
220 underlying file descriptor for the serial port.
221
222 Should the write-buffer become empty if this method
223 is invoked while the transport is closing, the protocol's
224 connection_lost() method will be called with None as its
225 argument.
226 """
227 data = b''.join(self._write_buffer)
228 num_bytes = len(data)
229 assert data, 'Write buffer should not be empty'
230
231 self._write_buffer.clear()
232
233 try:
234 n = self._serial.write(data)
235 except (BlockingIOError, InterruptedError):
236 self._write_buffer.append(data)
237 except serial.SerialException as exc:
238 self._fatal_error(exc, 'Fatal write error on serial transport')
239 else:
240 if n == len(data):
241 assert self._flushed()
242 self._remove_writer()
243 self._maybe_resume_protocol() # May cause further writes
244 # _write_ready may have been invoked by the event loop
245 # after the transport was closed, as part of the ongoing
246 # process of flushing buffered data. If the buffer
247 # is now empty, we can close the connection
248 if self._closing and self._flushed():
249 self._close()
250 return
251
252 assert n > 0
253 data = data[n:]
254 self._write_buffer.append(data) # Try again later
255 self._maybe_resume_protocol()
256 assert self._has_writer
257
258 def _ensure_reader(self):
259 if (not self._has_reader) and (not self._closing):
260 self._loop.add_reader(self._serial.fd, self._read_ready)
261 self._has_reader = True
262
263 def _remove_reader(self):
264 if self._has_reader:
265 self._loop.remove_reader(self._serial.fd)
266 self._has_reader = False
267
268 def _ensure_writer(self):
269 if (not self._has_writer) and (not self._closing):
270 self._loop.add_writer(self._serial.fd, self._write_ready)
271 self._has_writer = True
272
273 def _remove_writer(self):
274 if self._has_writer:
275 self._loop.remove_writer(self._serial.fd)
276 self._has_writer = False
277
278 def _set_write_buffer_limits(self, high=None, low=None):
279 """Ensure consistent write-buffer limits."""
280 if high is None:
Chris Liechti43b3b102016-06-07 21:31:47 +0200281 high = 64 * 1024 if low is None else 4 * low
Robert Smallshire4f429c92016-03-30 18:48:57 +0200282 if low is None:
283 low = high // 4
284 if not high >= low >= 0:
285 raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
286 (high, low))
287 self._high_water = high
288 self._low_water = low
289
290 def _fatal_error(self, exc, message='Fatal error on serial transport'):
291 """Report a fatal error to the event-loop and abort the transport."""
292 self._loop.call_exception_handler({
293 'message': message,
294 'exception': exc,
295 'transport': self,
296 'protocol': self._protocol,
297 })
298 self._abort(exc)
299
300 def _flushed(self):
301 """True if the write buffer is empty, otherwise False."""
302 return self.get_write_buffer_size() == 0
303
304 def _close(self, exc=None):
305 """Close the transport gracefully.
306
307 If the write buffer is already empty, writing will be
308 stopped immediately and a call to the protocol's
309 connection_lost() method scheduled.
310
311 If the write buffer is not already empty, the
312 asynchronous writing will continue, and the _write_ready
313 method will call this _close method again when the
314 buffer has been flushed completely.
315 """
316 self._closing = True
317 self._remove_reader()
318 if self._flushed():
319 self._remove_writer()
320 self._loop.call_soon(self._call_connection_lost, exc)
321
322 def _abort(self, exc):
323 """Close the transport immediately.
324
325 Pending operations will not be given opportunity to complete,
326 and buffered data will be lost. No more data will be received
327 and further writes will be ignored. The protocol's
328 connection_lost() method will eventually be called with the
329 passed exception.
330 """
331 self._closing = True
332 self._remove_reader()
333 self._remove_writer() # Pending buffered data will not be written
334 self._loop.call_soon(self._call_connection_lost, exc)
335
336 def _call_connection_lost(self, exc):
337 """Close the connection.
338
339 Informs the protocol through connection_lost() and clears
340 pending buffers and closes the serial connection.
341 """
342 assert self._closing
343 assert not self._has_writer
344 assert not self._has_reader
345 self._serial.flush()
346 try:
347 self._protocol.connection_lost(exc)
348 finally:
349 self._write_buffer.clear()
350 self._serial.close()
351 self._serial = None
352 self._protocol = None
353 self._loop = None
Chris Liechtid07a9e72015-08-22 02:58:47 +0200354
Chris Liechti033f17c2015-08-30 21:28:04 +0200355
Chris Liechtid07a9e72015-08-22 02:58:47 +0200356@asyncio.coroutine
357def create_serial_connection(loop, protocol_factory, *args, **kwargs):
Chris Liechti35f927d2016-05-23 22:35:06 +0200358 ser = serial.serial_for_url(*args, **kwargs)
Chris Liechtid07a9e72015-08-22 02:58:47 +0200359 protocol = protocol_factory()
360 transport = SerialTransport(loop, protocol, ser)
361 return (transport, protocol)
362
Robert Smallshire981a3212016-03-22 21:57:02 +0100363
364@asyncio.coroutine
365def open_serial_connection(*,
366 loop=None,
367 limit=asyncio.streams._DEFAULT_LIMIT,
368 **kwargs):
369 """A wrapper for create_serial_connection() returning a (reader,
370 writer) pair.
371
372 The reader returned is a StreamReader instance; the writer is a
373 StreamWriter instance.
374
375 The arguments are all the usual arguments to Serial(). Additional
376 optional keyword arguments are loop (to set the event loop instance
377 to use) and limit (to set the buffer limit passed to the
378 StreamReader.
379
380 This function is a coroutine.
381 """
382 if loop is None:
383 loop = asyncio.get_event_loop()
384 reader = asyncio.StreamReader(limit=limit, loop=loop)
385 protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
386 transport, _ = yield from create_serial_connection(
387 loop=loop,
388 protocol_factory=lambda: protocol,
389 **kwargs)
390 writer = asyncio.StreamWriter(transport, protocol, reader, loop)
391 return reader, writer
392
393
Chris Liechtid07a9e72015-08-22 02:58:47 +0200394# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
395# test
396if __name__ == '__main__':
397 class Output(asyncio.Protocol):
398 def connection_made(self, transport):
399 self.transport = transport
400 print('port opened', transport)
Chris Liechti0cd7d072015-09-04 23:04:53 +0200401 transport.serial.rts = False
Chris Liechtid07a9e72015-08-22 02:58:47 +0200402 transport.write(b'hello world\n')
403
404 def data_received(self, data):
405 print('data received', repr(data))
Chris Liechti35f927d2016-05-23 22:35:06 +0200406 if b'\n' in data:
407 self.transport.close()
Chris Liechtid07a9e72015-08-22 02:58:47 +0200408
409 def connection_lost(self, exc):
410 print('port closed')
411 asyncio.get_event_loop().stop()
412
Robert Smallshire4f429c92016-03-30 18:48:57 +0200413 def pause_writing(self):
414 print('pause writing')
415 print(self.transport.get_write_buffer_size())
416
417 def resume_writing(self):
418 print(self.transport.get_write_buffer_size())
419 print('resume writing')
420
Chris Liechtid07a9e72015-08-22 02:58:47 +0200421 loop = asyncio.get_event_loop()
422 coro = create_serial_connection(loop, Output, '/dev/ttyUSB0', baudrate=115200)
423 loop.run_until_complete(coro)
424 loop.run_forever()
425 loop.close()