diff --git a/Lib/_pyio.py b/Lib/_pyio.py
index 3fa9325..a2faeb3 100644
--- a/Lib/_pyio.py
+++ b/Lib/_pyio.py
@@ -6,6 +6,7 @@
 import abc
 import codecs
 import warnings
+import errno
 # Import _thread instead of threading to reduce startup cost
 try:
     from _thread import allocate_lock as Lock
@@ -720,8 +721,11 @@
 
     def close(self):
         if self.raw is not None and not self.closed:
-            self.flush()
-            self.raw.close()
+            try:
+                # may raise BlockingIOError or BrokenPipeError etc
+                self.flush()
+            finally:
+                self.raw.close()
 
     def detach(self):
         if self.raw is None:
@@ -1080,13 +1084,9 @@
             # XXX we can implement some more tricks to try and avoid
             # partial writes
             if len(self._write_buf) > self.buffer_size:
-                # We're full, so let's pre-flush the buffer
-                try:
-                    self._flush_unlocked()
-                except BlockingIOError as e:
-                    # We can't accept anything else.
-                    # XXX Why not just let the exception pass through?
-                    raise BlockingIOError(e.errno, e.strerror, 0)
+                # We're full, so let's pre-flush the buffer.  (This may
+                # raise BlockingIOError with characters_written == 0.)
+                self._flush_unlocked()
             before = len(self._write_buf)
             self._write_buf.extend(b)
             written = len(self._write_buf) - before
@@ -1117,24 +1117,23 @@
     def _flush_unlocked(self):
         if self.closed:
             raise ValueError("flush of closed file")
-        written = 0
-        try:
-            while self._write_buf:
-                try:
-                    n = self.raw.write(self._write_buf)
-                except IOError as e:
-                    if e.errno != EINTR:
-                        raise
-                    continue
-                if n > len(self._write_buf) or n < 0:
-                    raise IOError("write() returned incorrect number of bytes")
-                del self._write_buf[:n]
-                written += n
-        except BlockingIOError as e:
-            n = e.characters_written
+        while self._write_buf:
+            try:
+                n = self.raw.write(self._write_buf)
+            except BlockingIOError:
+                raise RuntimeError("self.raw should implement RawIOBase: it "
+                                   "should not raise BlockingIOError")
+            except IOError as e:
+                if e.errno != EINTR:
+                    raise
+                continue
+            if n is None:
+                raise BlockingIOError(
+                    errno.EAGAIN,
+                    "write could not complete without blocking", 0)
+            if n > len(self._write_buf) or n < 0:
+                raise IOError("write() returned incorrect number of bytes")
             del self._write_buf[:n]
-            written += n
-            raise BlockingIOError(e.errno, e.strerror, written)
 
     def tell(self):
         return _BufferedIOMixin.tell(self) + len(self._write_buf)
diff --git a/Lib/test/test_io.py b/Lib/test/test_io.py
index eb2ac5f..df7ca15 100644
--- a/Lib/test/test_io.py
+++ b/Lib/test/test_io.py
@@ -42,7 +42,10 @@
     import threading
 except ImportError:
     threading = None
-
+try:
+    import fcntl
+except ImportError:
+    fcntl = None
 
 def _default_chunk_size():
     """Get the default TextIOWrapper chunk size"""
@@ -242,9 +245,14 @@
             except ValueError:
                 pass
             else:
-                self._blocker_char = None
-                self._write_stack.append(b[:n])
-                raise self.BlockingIOError(0, "test blocking", n)
+                if n > 0:
+                    # write data up to the first blocker
+                    self._write_stack.append(b[:n])
+                    return n
+                else:
+                    # cancel blocker and indicate would block
+                    self._blocker_char = None
+                    return None
         self._write_stack.append(b)
         return len(b)
 
@@ -2743,6 +2751,68 @@
                 with self.open(support.TESTFN, **kwargs) as f:
                     self.assertRaises(TypeError, pickle.dumps, f, protocol)
 
+    @unittest.skipUnless(fcntl, 'fcntl required for this test')
+    def test_nonblock_pipe_write_bigbuf(self):
+        self._test_nonblock_pipe_write(16*1024)
+
+    @unittest.skipUnless(fcntl, 'fcntl required for this test')
+    def test_nonblock_pipe_write_smallbuf(self):
+        self._test_nonblock_pipe_write(1024)
+
+    def _set_non_blocking(self, fd):
+        flags = fcntl.fcntl(fd, fcntl.F_GETFL)
+        self.assertNotEqual(flags, -1)
+        res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+        self.assertEqual(res, 0)
+
+    def _test_nonblock_pipe_write(self, bufsize):
+        sent = []
+        received = []
+        r, w = os.pipe()
+        self._set_non_blocking(r)
+        self._set_non_blocking(w)
+
+        # To exercise all code paths in the C implementation we need
+        # to play with buffer sizes.  For instance, if we choose a
+        # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
+        # then we will never get a partial write of the buffer.
+        rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
+        wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
+
+        with rf, wf:
+            for N in 9999, 73, 7574:
+                try:
+                    i = 0
+                    while True:
+                        msg = bytes([i % 26 + 97]) * N
+                        sent.append(msg)
+                        wf.write(msg)
+                        i += 1
+
+                except self.BlockingIOError as e:
+                    self.assertEqual(e.args[0], errno.EAGAIN)
+                    sent[-1] = sent[-1][:e.characters_written]
+                    received.append(rf.read())
+                    msg = b'BLOCKED'
+                    wf.write(msg)
+                    sent.append(msg)
+
+            while True:
+                try:
+                    wf.flush()
+                    break
+                except self.BlockingIOError as e:
+                    self.assertEqual(e.args[0], errno.EAGAIN)
+                    self.assertEqual(e.characters_written, 0)
+                    received.append(rf.read())
+
+            received += iter(rf.read, None)
+
+        sent, received = b''.join(sent), b''.join(received)
+        self.assertTrue(sent == received)
+        self.assertTrue(wf.closed)
+        self.assertTrue(rf.closed)
+
 class CMiscIOTest(MiscIOTest):
     io = io
 
diff --git a/Misc/ACKS b/Misc/ACKS
index c66edb7..7f7296e 100644
--- a/Misc/ACKS
+++ b/Misc/ACKS
@@ -796,6 +796,7 @@
 Mark Sapiro
 Ty Sarna
 Ben Sayer
+sbt
 Andrew Schaaf
 Michael Scharf
 Andreas Schawo
diff --git a/Misc/NEWS b/Misc/NEWS
index c3765dd..4e43b45 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -80,6 +80,12 @@
 Library
 -------
 
+- Issue #13322: Fix BufferedWriter.write() to ensure that BlockingIOError is
+  raised when the wrapped raw file is non-blocking and the write would block.
+  Previous code assumed that the raw write() would raise BlockingIOError, but
+  RawIOBase.write() is defined to returned None when the call would block.
+  Patch by sbt.
+
 - Issue #13358: HTMLParser now calls handle_data only once for each CDATA.
 
 - Issue #4147: minidom's toprettyxml no longer adds whitespace around a text
diff --git a/Modules/_io/bufferedio.c b/Modules/_io/bufferedio.c
index c979ac2..4c43c69 100644
--- a/Modules/_io/bufferedio.c
+++ b/Modules/_io/bufferedio.c
@@ -581,7 +581,7 @@
 
 /* Forward decls */
 static PyObject *
-_bufferedwriter_flush_unlocked(buffered *, int);
+_bufferedwriter_flush_unlocked(buffered *);
 static Py_ssize_t
 _bufferedreader_fill_buffer(buffered *self);
 static void
@@ -602,6 +602,18 @@
  * Helpers
  */
 
+/* Sets the current error to BlockingIOError */
+static void
+_set_BlockingIOError(char *msg, Py_ssize_t written)
+{
+    PyObject *err;
+    err = PyObject_CallFunction(PyExc_BlockingIOError, "isn",
+                                errno, msg, written);
+    if (err)
+        PyErr_SetObject(PyExc_BlockingIOError, err);
+    Py_XDECREF(err);
+}
+
 /* Returns the address of the `written` member if a BlockingIOError was
    raised, NULL otherwise. The error is always re-raised. */
 static Py_ssize_t *
@@ -756,7 +768,7 @@
 {
     PyObject *res;
 
-    res = _bufferedwriter_flush_unlocked(self, 0);
+    res = _bufferedwriter_flush_unlocked(self);
     if (res == NULL)
         return NULL;
     Py_DECREF(res);
@@ -1121,7 +1133,7 @@
 
     /* Fallback: invoke raw seek() method and clear buffer */
     if (self->writable) {
-        res = _bufferedwriter_flush_unlocked(self, 0);
+        res = _bufferedwriter_flush_unlocked(self);
         if (res == NULL)
             goto end;
         Py_CLEAR(res);
@@ -1714,6 +1726,7 @@
     Py_buffer buf;
     PyObject *memobj, *res;
     Py_ssize_t n;
+    int errnum;
     /* NOTE: the buffer needn't be released as its object is NULL. */
     if (PyBuffer_FillInfo(&buf, NULL, start, len, 1, PyBUF_CONTIG_RO) == -1)
         return -1;
@@ -1726,11 +1739,21 @@
        raised (see issue #10956).
     */
     do {
+        errno = 0;
         res = PyObject_CallMethodObjArgs(self->raw, _PyIO_str_write, memobj, NULL);
+        errnum = errno;
     } while (res == NULL && _trap_eintr());
     Py_DECREF(memobj);
     if (res == NULL)
         return -1;
+    if (res == Py_None) {
+        /* Non-blocking stream would have blocked. Special return code!
+           Being paranoid we reset errno in case it is changed by code
+           triggered by a decref.  errno is used by _set_BlockingIOError(). */
+        Py_DECREF(res);
+        errno = errnum;
+        return -2;
+    }
     n = PyNumber_AsSsize_t(res, PyExc_ValueError);
     Py_DECREF(res);
     if (n < 0 || n > len) {
@@ -1747,7 +1770,7 @@
 /* `restore_pos` is 1 if we need to restore the raw stream position at
    the end, 0 otherwise. */
 static PyObject *
-_bufferedwriter_flush_unlocked(buffered *self, int restore_pos)
+_bufferedwriter_flush_unlocked(buffered *self)
 {
     Py_ssize_t written = 0;
     Py_off_t n, rewind;
@@ -1769,14 +1792,11 @@
             Py_SAFE_DOWNCAST(self->write_end - self->write_pos,
                              Py_off_t, Py_ssize_t));
         if (n == -1) {
-            Py_ssize_t *w = _buffered_check_blocking_error();
-            if (w == NULL)
-                goto error;
-            self->write_pos += *w;
-            self->raw_pos = self->write_pos;
-            written += *w;
-            *w = written;
-            /* Already re-raised */
+            goto error;
+        }
+        else if (n == -2) {
+            _set_BlockingIOError("write could not complete without blocking",
+                                 0);
             goto error;
         }
         self->write_pos += n;
@@ -1789,16 +1809,6 @@
             goto error;
     }
 
-    if (restore_pos) {
-        Py_off_t forward = rewind - written;
-        if (forward != 0) {
-            n = _buffered_raw_seek(self, forward, 1);
-            if (n < 0) {
-                goto error;
-            }
-            self->raw_pos += forward;
-        }
-    }
     _bufferedwriter_reset_buf(self);
 
 end:
@@ -1851,7 +1861,7 @@
     }
 
     /* First write the current buffer */
-    res = _bufferedwriter_flush_unlocked(self, 0);
+    res = _bufferedwriter_flush_unlocked(self);
     if (res == NULL) {
         Py_ssize_t *w = _buffered_check_blocking_error();
         if (w == NULL)
@@ -1874,14 +1884,19 @@
             PyErr_Clear();
             memcpy(self->buffer + self->write_end, buf.buf, buf.len);
             self->write_end += buf.len;
+            self->pos += buf.len;
             written = buf.len;
             goto end;
         }
         /* Buffer as much as possible. */
         memcpy(self->buffer + self->write_end, buf.buf, avail);
         self->write_end += avail;
-        /* Already re-raised */
-        *w = avail;
+        self->pos += avail;
+        /* XXX Modifying the existing exception e using the pointer w
+           will change e.characters_written but not e.args[2].
+           Therefore we just replace with a new error. */
+        _set_BlockingIOError("write could not complete without blocking",
+                             avail);
         goto error;
     }
     Py_CLEAR(res);
@@ -1906,11 +1921,9 @@
         Py_ssize_t n = _bufferedwriter_raw_write(
             self, (char *) buf.buf + written, buf.len - written);
         if (n == -1) {
-            Py_ssize_t *w = _buffered_check_blocking_error();
-            if (w == NULL)
-                goto error;
-            written += *w;
-            remaining -= *w;
+            goto error;
+        } else if (n == -2) {
+            /* Write failed because raw file is non-blocking */
             if (remaining > self->buffer_size) {
                 /* Can't buffer everything, still buffer as much as possible */
                 memcpy(self->buffer,
@@ -1918,8 +1931,9 @@
                 self->raw_pos = 0;
                 ADJUST_POSITION(self, self->buffer_size);
                 self->write_end = self->buffer_size;
-                *w = written + self->buffer_size;
-                /* Already re-raised */
+                written += self->buffer_size;
+                _set_BlockingIOError("write could not complete without "
+                                     "blocking", written);
                 goto error;
             }
             PyErr_Clear();
