Non-blocking asychronous SerialTransport with write-buffering.
diff --git a/serial/aio.py b/serial/aio.py
index 76d4f18..3ba12d2 100644
--- a/serial/aio.py
+++ b/serial/aio.py
@@ -16,23 +16,52 @@
 """
 import asyncio
 import serial
-import logging
 
 
 class SerialTransport(asyncio.Transport):
+    """An asyncio transport model of a serial communication channel.
+
+    A transport class is an abstraction of a communication channel.
+    This allows protocol implementations to be developed against the
+    transport abstraction without needing to know the details of the
+    underlying channel, such as whether it is a pipe, a socket, or
+    indeed a serial port.
+
+
+    You generally won’t instantiate a transport yourself; instead, you
+    will call `create_serial_connection` which will create the
+    transport and try to initiate the underlying communication channel,
+    calling you back when it succeeds.
+    """
+
     def __init__(self, loop, protocol, serial_instance):
         super().__init__()
         self._loop = loop
         self._protocol = protocol
-        self.serial = serial_instance
+        self._serial = serial_instance
         self._closing = False
-        self._paused = False
+        self._protocol_paused = False
+        self._max_read_size = 1024
+        self._write_buffer = []
+        self._set_write_buffer_limits()
+        self._has_reader = False
+        self._has_writer = False
+
         # XXX how to support url handlers too
-        self.serial.timeout = 0
-        self.serial.nonblocking()
+
+        # Asynchronous I/O requires non-blocking devices
+        self._serial.timeout = 0
+        self._serial.write_timeout = 0
+        self._serial.nonblocking()
+
+        # These two callbacks will be enqueued in a FIFO queue by asyncio
         loop.call_soon(protocol.connection_made, self)
-        # only start reading when connection_made() has been called
-        loop.call_soon(loop.add_reader, self.serial.fd, self._read_ready)
+        loop.call_soon(self._ensure_reader)
+
+    @property
+    def serial(self):
+        """The underlying Serial instance."""
+        return self._serial
 
     def __repr__(self):
         return '{self.__class__.__name__}({self._loop}, {self._protocol}, {self.serial})'.format(self=self)
@@ -41,57 +70,288 @@
         """Return True if the transport is closing or closed."""
         return self._closing
 
-    def close(self, exc=None):
-        if self._closing:
-            return
-        self._closing = True
-        self._loop.remove_reader(self.serial.fd)
-        self.serial.close()
-        self._loop.call_soon(self._protocol.connection_lost, exc)
+    def close(self):
+        """Close the transport gracefully.
+
+        Any buffered data will be written asynchronously. No more data
+        will be received and further writes will be silently ignored.
+        After all buffered data is flushed, the protocol's
+        connection_lost() method will be called with None as its
+        argument.
+        """
+        if not self._closing:
+            self._close(None)
 
     def _read_ready(self):
         try:
-            data = self.serial.read(1024)
+            data = self._serial.read(self._max_read_size)
         except serial.SerialException as e:
-            self.close(exc=e)
+            self._close(exc=e)
         else:
             if data:
                 self._protocol.data_received(data)
 
     def write(self, data):
-        try:
-            self.serial.write(data)
-        except serial.SerialException as e:
-            self.close(exc=e)
+        """Write some data to the transport.
+
+        This method does not block; it buffers the data and arranges
+        for it to be sent out asynchronously.  Writes made after the
+        transport has been closed will be ignored."""
+        if self._closing:
+            return
+
+        if self.get_write_buffer_size() == 0:
+            # Attempt to send it right away first
+            try:
+                n = self._serial.write(data)
+            except serial.SerialException as exc:
+                self._fatal_error(exc, 'Fatal write error on serial transport')
+                return
+            if n == len(data):
+                return  # Whole request satisfied
+            assert n > 0
+            data = data[n:]
+            self._ensure_writer()
+
+        self._write_buffer.append(data)
+        self._maybe_pause_protocol()
 
     def can_write_eof(self):
+        """Serial ports do not support the concept of end-of-file.
+
+        Always returns False.
+        """
         return False
 
     def pause_reading(self):
-        if self._closing:
-            raise RuntimeError('Cannot pause_reading() when closing')
-        if self._paused:
-            raise RuntimeError('Already paused')
-        self._paused = True
-        self._loop.remove_reader(self.serial.fd)
-        if self._loop.get_debug():
-            logging.debug("%r pauses reading", self)
+        """Pause the receiving end of the transport.
+
+        No data will be passed to the protocol’s data_received() method
+        until resume_reading() is called.
+        """
+        self._remove_reader()
 
     def resume_reading(self):
-        if not self._paused:
-            raise RuntimeError('Not paused')
-        self._paused = False
-        if self._closing:
-            return
-        self._loop.add_reader(self.serial.fd, self._read_ready)
-        if self._loop.get_debug():
-            logging.debug("%r resumes reading", self)
+        """Resume the receiving end of the transport.
 
-    #~ def set_write_buffer_limits(self, high=None, low=None):
-    #~ def get_write_buffer_size(self):
-    #~ def writelines(self, list_of_data):
-    #~ def write_eof(self):
-    #~ def abort(self):
+        Incoming data will be passed to the protocol's data_received()
+        method until pause_reading() is called.
+        """
+        self._ensure_reader()
+
+    def set_write_buffer_limits(self, high=None, low=None):
+        """Set the high- and low-water limits for write flow control.
+
+        These two values control when call the protocol’s
+        pause_writing()and resume_writing() methods are called. If
+        specified, the low-water limit must be less than or equal to
+        the high-water limit. Neither high nor low can be negative.
+        """
+        self._set_write_buffer_limits(high=high, low=low)
+        self._maybe_pause_protocol()
+
+    def get_write_buffer_size(self):
+        """The number of bytes in the write buffer.
+
+        This buffer is unbounded, so the result may be larger than the
+        the high water mark.
+        """
+        return sum(map(len, self._write_buffer))
+
+    def write_eof(self):
+        raise NotImplementedError("Serial connections do not support end-of-file")
+
+    def abort(self):
+        """Close the transport immediately.
+
+        Pending operations will not be given opportunity to complete,
+        and buffered data will be lost. No more data will be received
+        and further writes will be ignored.  The protocol's
+        connection_lost() method will eventually be called.
+        """
+        self._abort(None)
+
+    def _maybe_pause_protocol(self):
+        """To be called whenever the write-buffer size increases.
+
+        Tests the current write-buffer size against the high water
+        mark configured for this transport. If the high water mark is
+        exceeded, the protocol is instructed to pause_writing().
+        """
+        if self.get_write_buffer_size() <= self._high_water:
+            return
+        if not self._protocol_paused:
+            self._protocol_paused = True
+            try:
+                self._protocol.pause_writing()
+            except Exception as exc:
+                self._loop.call_exception_handler({
+                    'message': 'protocol.pause_writing() failed',
+                    'exception': exc,
+                    'transport': self,
+                    'protocol': self._protocol,
+                })
+
+    def _maybe_resume_protocol(self):
+        """To be called whenever the write-buffer size decreases.
+
+        Tests the current write-buffer size against the low water
+        mark configured for this transport. If the write-buffer
+        size is below the low water mark, the protocol is
+        instructed that is can resume_writing().
+        """
+        if (self._protocol_paused and
+                self.get_write_buffer_size() <= self._low_water):
+            self._protocol_paused = False
+            try:
+                self._protocol.resume_writing()
+            except Exception as exc:
+                self._loop.call_exception_handler({
+                    'message': 'protocol.resume_writing() failed',
+                    'exception': exc,
+                    'transport': self,
+                    'protocol': self._protocol,
+                })
+
+    def _write_ready(self):
+        """Asynchronously write buffered data.
+
+        This method is called back asynchronously as a writer
+        registered with the asyncio event-loop against the
+        underlying file descriptor for the serial port.
+
+        Should the write-buffer become empty if this method
+        is invoked while the transport is closing, the protocol's
+        connection_lost() method will be called with None as its
+        argument.
+        """
+        data = b''.join(self._write_buffer)
+        num_bytes = len(data)
+        assert data, 'Write buffer should not be empty'
+
+        self._write_buffer.clear()
+
+        try:
+            n = self._serial.write(data)
+        except (BlockingIOError, InterruptedError):
+            self._write_buffer.append(data)
+        except serial.SerialException as exc:
+            self._fatal_error(exc, 'Fatal write error on serial transport')
+        else:
+            if n == len(data):
+                assert self._flushed()
+                self._remove_writer()
+                self._maybe_resume_protocol()  # May cause further writes
+                # _write_ready may have been invoked by the event loop
+                # after the transport was closed, as part of the ongoing
+                # process of flushing buffered data. If the buffer
+                # is now empty, we can close the connection
+                if self._closing and self._flushed():
+                    self._close()
+                return
+
+            assert n > 0
+            data = data[n:]
+            self._write_buffer.append(data)  # Try again later
+            self._maybe_resume_protocol()
+            assert self._has_writer
+
+    def _ensure_reader(self):
+        if (not self._has_reader) and (not self._closing):
+            self._loop.add_reader(self._serial.fd, self._read_ready)
+            self._has_reader = True
+
+    def _remove_reader(self):
+        if self._has_reader:
+            self._loop.remove_reader(self._serial.fd)
+            self._has_reader = False
+
+    def _ensure_writer(self):
+        if (not self._has_writer) and (not self._closing):
+            self._loop.add_writer(self._serial.fd, self._write_ready)
+            self._has_writer = True
+
+    def _remove_writer(self):
+        if self._has_writer:
+            self._loop.remove_writer(self._serial.fd)
+            self._has_writer = False
+
+    def _set_write_buffer_limits(self, high=None, low=None):
+        """Ensure consistent write-buffer limits."""
+        if high is None:
+            high = 64*1024 if low is None else 4*low
+        if low is None:
+            low = high // 4
+        if not high >= low >= 0:
+            raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
+                             (high, low))
+        self._high_water = high
+        self._low_water = low
+
+    def _fatal_error(self, exc, message='Fatal error on serial transport'):
+        """Report a fatal error to the event-loop and abort the transport."""
+        self._loop.call_exception_handler({
+            'message': message,
+            'exception': exc,
+            'transport': self,
+            'protocol': self._protocol,
+        })
+        self._abort(exc)
+
+    def _flushed(self):
+        """True if the write buffer is empty, otherwise False."""
+        return self.get_write_buffer_size() == 0
+
+    def _close(self, exc=None):
+        """Close the transport gracefully.
+
+        If the write buffer is already empty, writing will be
+        stopped immediately and a call to the protocol's
+        connection_lost() method scheduled.
+
+        If the write buffer is not already empty, the
+        asynchronous writing will continue, and the _write_ready
+        method will call this _close method again when the
+        buffer has been flushed completely.
+        """
+        self._closing = True
+        self._remove_reader()
+        if self._flushed():
+            self._remove_writer()
+            self._loop.call_soon(self._call_connection_lost, exc)
+
+    def _abort(self, exc):
+        """Close the transport immediately.
+
+        Pending operations will not be given opportunity to complete,
+        and buffered data will be lost. No more data will be received
+        and further writes will be ignored.  The protocol's
+        connection_lost() method will eventually be called with the
+        passed exception.
+        """
+        self._closing = True
+        self._remove_reader()
+        self._remove_writer()  # Pending buffered data will not be written
+        self._loop.call_soon(self._call_connection_lost, exc)
+
+    def _call_connection_lost(self, exc):
+        """Close the connection.
+
+        Informs the protocol through connection_lost() and clears
+        pending buffers and closes the serial connection.
+        """
+        assert self._closing
+        assert not self._has_writer
+        assert not self._has_reader
+        self._serial.flush()
+        try:
+            self._protocol.connection_lost(exc)
+        finally:
+            self._write_buffer.clear()
+            self._serial.close()
+            self._serial = None
+            self._protocol = None
+            self._loop = None
 
 
 @asyncio.coroutine
@@ -150,6 +410,14 @@
             print('port closed')
             asyncio.get_event_loop().stop()
 
+        def pause_writing(self):
+            print('pause writing')
+            print(self.transport.get_write_buffer_size())
+
+        def resume_writing(self):
+            print(self.transport.get_write_buffer_size())
+            print('resume writing')
+
     loop = asyncio.get_event_loop()
     coro = create_serial_connection(loop, Output, '/dev/ttyUSB0', baudrate=115200)
     loop.run_until_complete(coro)