diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
index 2f02d76..12d357b 100644
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -11,6 +11,7 @@
 import functools
 import socket
 import warnings
+import weakref
 try:
     import ssl
 except ImportError:  # pragma: no cover
@@ -64,6 +65,7 @@
         logger.debug('Using selector: %s', selector.__class__.__name__)
         self._selector = selector
         self._make_self_pipe()
+        self._transports = weakref.WeakValueDictionary()
 
     def _make_socket_transport(self, sock, protocol, waiter=None, *,
                                extra=None, server=None):
@@ -115,7 +117,7 @@
         raise NotImplementedError
 
     def _close_self_pipe(self):
-        self.remove_reader(self._ssock.fileno())
+        self._remove_reader(self._ssock.fileno())
         self._ssock.close()
         self._ssock = None
         self._csock.close()
@@ -128,7 +130,7 @@
         self._ssock.setblocking(False)
         self._csock.setblocking(False)
         self._internal_fds += 1
-        self.add_reader(self._ssock.fileno(), self._read_from_self)
+        self._add_reader(self._ssock.fileno(), self._read_from_self)
 
     def _process_self_data(self, data):
         pass
@@ -163,8 +165,8 @@
 
     def _start_serving(self, protocol_factory, sock,
                        sslcontext=None, server=None, backlog=100):
-        self.add_reader(sock.fileno(), self._accept_connection,
-                        protocol_factory, sock, sslcontext, server, backlog)
+        self._add_reader(sock.fileno(), self._accept_connection,
+                         protocol_factory, sock, sslcontext, server, backlog)
 
     def _accept_connection(self, protocol_factory, sock,
                            sslcontext=None, server=None, backlog=100):
@@ -194,7 +196,7 @@
                         'exception': exc,
                         'socket': sock,
                     })
-                    self.remove_reader(sock.fileno())
+                    self._remove_reader(sock.fileno())
                     self.call_later(constants.ACCEPT_RETRY_DELAY,
                                     self._start_serving,
                                     protocol_factory, sock, sslcontext, server,
@@ -244,8 +246,18 @@
                     context['transport'] = transport
                 self.call_exception_handler(context)
 
-    def add_reader(self, fd, callback, *args):
-        """Add a reader callback."""
+    def _ensure_fd_no_transport(self, fd):
+        try:
+            transport = self._transports[fd]
+        except KeyError:
+            pass
+        else:
+            if not transport.is_closing():
+                raise RuntimeError(
+                    'File descriptor {!r} is used by transport {!r}'.format(
+                        fd, transport))
+
+    def _add_reader(self, fd, callback, *args):
         self._check_closed()
         handle = events.Handle(callback, args, self)
         try:
@@ -260,8 +272,7 @@
             if reader is not None:
                 reader.cancel()
 
-    def remove_reader(self, fd):
-        """Remove a reader callback."""
+    def _remove_reader(self, fd):
         if self.is_closed():
             return False
         try:
@@ -282,8 +293,7 @@
             else:
                 return False
 
-    def add_writer(self, fd, callback, *args):
-        """Add a writer callback.."""
+    def _add_writer(self, fd, callback, *args):
         self._check_closed()
         handle = events.Handle(callback, args, self)
         try:
@@ -298,7 +308,7 @@
             if writer is not None:
                 writer.cancel()
 
-    def remove_writer(self, fd):
+    def _remove_writer(self, fd):
         """Remove a writer callback."""
         if self.is_closed():
             return False
@@ -321,6 +331,26 @@
             else:
                 return False
 
+    def add_reader(self, fd, callback, *args):
+        """Add a reader callback."""
+        self._ensure_fd_no_transport(fd)
+        return self._add_reader(fd, callback, *args)
+
+    def remove_reader(self, fd):
+        """Remove a reader callback."""
+        self._ensure_fd_no_transport(fd)
+        return self._remove_reader(fd)
+
+    def add_writer(self, fd, callback, *args):
+        """Add a writer callback.."""
+        self._ensure_fd_no_transport(fd)
+        return self._add_writer(fd, callback, *args)
+
+    def remove_writer(self, fd):
+        """Remove a writer callback."""
+        self._ensure_fd_no_transport(fd)
+        return self._remove_writer(fd)
+
     def sock_recv(self, sock, n):
         """Receive data from the socket.
 
@@ -494,17 +524,17 @@
             fileobj, (reader, writer) = key.fileobj, key.data
             if mask & selectors.EVENT_READ and reader is not None:
                 if reader._cancelled:
-                    self.remove_reader(fileobj)
+                    self._remove_reader(fileobj)
                 else:
                     self._add_callback(reader)
             if mask & selectors.EVENT_WRITE and writer is not None:
                 if writer._cancelled:
-                    self.remove_writer(fileobj)
+                    self._remove_writer(fileobj)
                 else:
                     self._add_callback(writer)
 
     def _stop_serving(self, sock):
-        self.remove_reader(sock.fileno())
+        self._remove_reader(sock.fileno())
         sock.close()
 
 
@@ -539,6 +569,7 @@
         self._closing = False  # Set when close() called.
         if self._server is not None:
             self._server._attach()
+        loop._transports[self._sock_fd] = self
 
     def __repr__(self):
         info = [self.__class__.__name__]
@@ -584,10 +615,10 @@
         if self._closing:
             return
         self._closing = True
-        self._loop.remove_reader(self._sock_fd)
+        self._loop._remove_reader(self._sock_fd)
         if not self._buffer:
             self._conn_lost += 1
-            self._loop.remove_writer(self._sock_fd)
+            self._loop._remove_writer(self._sock_fd)
             self._loop.call_soon(self._call_connection_lost, None)
 
     # On Python 3.3 and older, objects with a destructor part of a reference
@@ -618,10 +649,10 @@
             return
         if self._buffer:
             self._buffer.clear()
-            self._loop.remove_writer(self._sock_fd)
+            self._loop._remove_writer(self._sock_fd)
         if not self._closing:
             self._closing = True
-            self._loop.remove_reader(self._sock_fd)
+            self._loop._remove_reader(self._sock_fd)
         self._conn_lost += 1
         self._loop.call_soon(self._call_connection_lost, exc)
 
@@ -658,7 +689,7 @@
 
         self._loop.call_soon(self._protocol.connection_made, self)
         # only start reading when connection_made() has been called
-        self._loop.call_soon(self._loop.add_reader,
+        self._loop.call_soon(self._loop._add_reader,
                              self._sock_fd, self._read_ready)
         if waiter is not None:
             # only wake up the waiter when connection_made() has been called
@@ -671,7 +702,7 @@
         if self._paused:
             raise RuntimeError('Already paused')
         self._paused = True
-        self._loop.remove_reader(self._sock_fd)
+        self._loop._remove_reader(self._sock_fd)
         if self._loop.get_debug():
             logger.debug("%r pauses reading", self)
 
@@ -681,7 +712,7 @@
         self._paused = False
         if self._closing:
             return
-        self._loop.add_reader(self._sock_fd, self._read_ready)
+        self._loop._add_reader(self._sock_fd, self._read_ready)
         if self._loop.get_debug():
             logger.debug("%r resumes reading", self)
 
@@ -705,7 +736,7 @@
                     # We're keeping the connection open so the
                     # protocol can write more, but we still can't
                     # receive more, so remove the reader callback.
-                    self._loop.remove_reader(self._sock_fd)
+                    self._loop._remove_reader(self._sock_fd)
                 else:
                     self.close()
 
@@ -738,7 +769,7 @@
                 if not data:
                     return
             # Not all was written; register write handler.
-            self._loop.add_writer(self._sock_fd, self._write_ready)
+            self._loop._add_writer(self._sock_fd, self._write_ready)
 
         # Add it to the buffer.
         self._buffer.extend(data)
@@ -754,7 +785,7 @@
         except (BlockingIOError, InterruptedError):
             pass
         except Exception as exc:
-            self._loop.remove_writer(self._sock_fd)
+            self._loop._remove_writer(self._sock_fd)
             self._buffer.clear()
             self._fatal_error(exc, 'Fatal write error on socket transport')
         else:
@@ -762,7 +793,7 @@
                 del self._buffer[:n]
             self._maybe_resume_protocol()  # May append to buffer.
             if not self._buffer:
-                self._loop.remove_writer(self._sock_fd)
+                self._loop._remove_writer(self._sock_fd)
                 if self._closing:
                     self._call_connection_lost(None)
                 elif self._eof:
@@ -833,19 +864,19 @@
         try:
             self._sock.do_handshake()
         except ssl.SSLWantReadError:
-            self._loop.add_reader(self._sock_fd,
-                                  self._on_handshake, start_time)
+            self._loop._add_reader(self._sock_fd,
+                                   self._on_handshake, start_time)
             return
         except ssl.SSLWantWriteError:
-            self._loop.add_writer(self._sock_fd,
-                                  self._on_handshake, start_time)
+            self._loop._add_writer(self._sock_fd,
+                                   self._on_handshake, start_time)
             return
         except BaseException as exc:
             if self._loop.get_debug():
                 logger.warning("%r: SSL handshake failed",
                                self, exc_info=True)
-            self._loop.remove_reader(self._sock_fd)
-            self._loop.remove_writer(self._sock_fd)
+            self._loop._remove_reader(self._sock_fd)
+            self._loop._remove_writer(self._sock_fd)
             self._sock.close()
             self._wakeup_waiter(exc)
             if isinstance(exc, Exception):
@@ -853,8 +884,8 @@
             else:
                 raise
 
-        self._loop.remove_reader(self._sock_fd)
-        self._loop.remove_writer(self._sock_fd)
+        self._loop._remove_reader(self._sock_fd)
+        self._loop._remove_writer(self._sock_fd)
 
         peercert = self._sock.getpeercert()
         if not hasattr(self._sslcontext, 'check_hostname'):
@@ -882,7 +913,7 @@
 
         self._read_wants_write = False
         self._write_wants_read = False
-        self._loop.add_reader(self._sock_fd, self._read_ready)
+        self._loop._add_reader(self._sock_fd, self._read_ready)
         self._protocol_connected = True
         self._loop.call_soon(self._protocol.connection_made, self)
         # only wake up the waiter when connection_made() has been called
@@ -904,7 +935,7 @@
         if self._paused:
             raise RuntimeError('Already paused')
         self._paused = True
-        self._loop.remove_reader(self._sock_fd)
+        self._loop._remove_reader(self._sock_fd)
         if self._loop.get_debug():
             logger.debug("%r pauses reading", self)
 
@@ -914,7 +945,7 @@
         self._paused = False
         if self._closing:
             return
-        self._loop.add_reader(self._sock_fd, self._read_ready)
+        self._loop._add_reader(self._sock_fd, self._read_ready)
         if self._loop.get_debug():
             logger.debug("%r resumes reading", self)
 
@@ -926,7 +957,7 @@
             self._write_ready()
 
             if self._buffer:
-                self._loop.add_writer(self._sock_fd, self._write_ready)
+                self._loop._add_writer(self._sock_fd, self._write_ready)
 
         try:
             data = self._sock.recv(self.max_size)
@@ -934,8 +965,8 @@
             pass
         except ssl.SSLWantWriteError:
             self._read_wants_write = True
-            self._loop.remove_reader(self._sock_fd)
-            self._loop.add_writer(self._sock_fd, self._write_ready)
+            self._loop._remove_reader(self._sock_fd)
+            self._loop._add_writer(self._sock_fd, self._write_ready)
         except Exception as exc:
             self._fatal_error(exc, 'Fatal read error on SSL transport')
         else:
@@ -960,7 +991,7 @@
             self._read_ready()
 
             if not (self._paused or self._closing):
-                self._loop.add_reader(self._sock_fd, self._read_ready)
+                self._loop._add_reader(self._sock_fd, self._read_ready)
 
         if self._buffer:
             try:
@@ -969,10 +1000,10 @@
                 n = 0
             except ssl.SSLWantReadError:
                 n = 0
-                self._loop.remove_writer(self._sock_fd)
+                self._loop._remove_writer(self._sock_fd)
                 self._write_wants_read = True
             except Exception as exc:
-                self._loop.remove_writer(self._sock_fd)
+                self._loop._remove_writer(self._sock_fd)
                 self._buffer.clear()
                 self._fatal_error(exc, 'Fatal write error on SSL transport')
                 return
@@ -983,7 +1014,7 @@
         self._maybe_resume_protocol()  # May append to buffer.
 
         if not self._buffer:
-            self._loop.remove_writer(self._sock_fd)
+            self._loop._remove_writer(self._sock_fd)
             if self._closing:
                 self._call_connection_lost(None)
 
@@ -1001,7 +1032,7 @@
             return
 
         if not self._buffer:
-            self._loop.add_writer(self._sock_fd, self._write_ready)
+            self._loop._add_writer(self._sock_fd, self._write_ready)
 
         # Add it to the buffer.
         self._buffer.extend(data)
@@ -1021,7 +1052,7 @@
         self._address = address
         self._loop.call_soon(self._protocol.connection_made, self)
         # only start reading when connection_made() has been called
-        self._loop.call_soon(self._loop.add_reader,
+        self._loop.call_soon(self._loop._add_reader,
                              self._sock_fd, self._read_ready)
         if waiter is not None:
             # only wake up the waiter when connection_made() has been called
@@ -1071,7 +1102,7 @@
                     self._sock.sendto(data, addr)
                 return
             except (BlockingIOError, InterruptedError):
-                self._loop.add_writer(self._sock_fd, self._sendto_ready)
+                self._loop._add_writer(self._sock_fd, self._sendto_ready)
             except OSError as exc:
                 self._protocol.error_received(exc)
                 return
@@ -1105,6 +1136,6 @@
 
         self._maybe_resume_protocol()  # May append to buffer.
         if not self._buffer:
-            self._loop.remove_writer(self._sock_fd)
+            self._loop._remove_writer(self._sock_fd)
             if self._closing:
                 self._call_connection_lost(None)
diff --git a/Lib/asyncio/test_utils.py b/Lib/asyncio/test_utils.py
index 396e6ae..307fffc 100644
--- a/Lib/asyncio/test_utils.py
+++ b/Lib/asyncio/test_utils.py
@@ -13,6 +13,8 @@
 import threading
 import time
 import unittest
+import weakref
+
 from unittest import mock
 
 from http.server import HTTPServer
@@ -300,6 +302,8 @@
         self.writers = {}
         self.reset_counters()
 
+        self._transports = weakref.WeakValueDictionary()
+
     def time(self):
         return self._time
 
@@ -318,10 +322,10 @@
             else:  # pragma: no cover
                 raise AssertionError("Time generator is not finished")
 
-    def add_reader(self, fd, callback, *args):
+    def _add_reader(self, fd, callback, *args):
         self.readers[fd] = events.Handle(callback, args, self)
 
-    def remove_reader(self, fd):
+    def _remove_reader(self, fd):
         self.remove_reader_count[fd] += 1
         if fd in self.readers:
             del self.readers[fd]
@@ -337,10 +341,10 @@
         assert handle._args == args, '{!r} != {!r}'.format(
             handle._args, args)
 
-    def add_writer(self, fd, callback, *args):
+    def _add_writer(self, fd, callback, *args):
         self.writers[fd] = events.Handle(callback, args, self)
 
-    def remove_writer(self, fd):
+    def _remove_writer(self, fd):
         self.remove_writer_count[fd] += 1
         if fd in self.writers:
             del self.writers[fd]
@@ -356,6 +360,36 @@
         assert handle._args == args, '{!r} != {!r}'.format(
             handle._args, args)
 
+    def _ensure_fd_no_transport(self, fd):
+        try:
+            transport = self._transports[fd]
+        except KeyError:
+            pass
+        else:
+            raise RuntimeError(
+                'File descriptor {!r} is used by transport {!r}'.format(
+                    fd, transport))
+
+    def add_reader(self, fd, callback, *args):
+        """Add a reader callback."""
+        self._ensure_fd_no_transport(fd)
+        return self._add_reader(fd, callback, *args)
+
+    def remove_reader(self, fd):
+        """Remove a reader callback."""
+        self._ensure_fd_no_transport(fd)
+        return self._remove_reader(fd)
+
+    def add_writer(self, fd, callback, *args):
+        """Add a writer callback.."""
+        self._ensure_fd_no_transport(fd)
+        return self._add_writer(fd, callback, *args)
+
+    def remove_writer(self, fd):
+        """Remove a writer callback."""
+        self._ensure_fd_no_transport(fd)
+        return self._remove_writer(fd)
+
     def reset_counters(self):
         self.remove_reader_count = collections.defaultdict(int)
         self.remove_writer_count = collections.defaultdict(int)
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
index ac44218..e1f5c52 100644
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -321,7 +321,7 @@
 
         self._loop.call_soon(self._protocol.connection_made, self)
         # only start reading when connection_made() has been called
-        self._loop.call_soon(self._loop.add_reader,
+        self._loop.call_soon(self._loop._add_reader,
                              self._fileno, self._read_ready)
         if waiter is not None:
             # only wake up the waiter when connection_made() has been called
@@ -364,15 +364,15 @@
                 if self._loop.get_debug():
                     logger.info("%r was closed by peer", self)
                 self._closing = True
-                self._loop.remove_reader(self._fileno)
+                self._loop._remove_reader(self._fileno)
                 self._loop.call_soon(self._protocol.eof_received)
                 self._loop.call_soon(self._call_connection_lost, None)
 
     def pause_reading(self):
-        self._loop.remove_reader(self._fileno)
+        self._loop._remove_reader(self._fileno)
 
     def resume_reading(self):
-        self._loop.add_reader(self._fileno, self._read_ready)
+        self._loop._add_reader(self._fileno, self._read_ready)
 
     def set_protocol(self, protocol):
         self._protocol = protocol
@@ -412,7 +412,7 @@
 
     def _close(self, exc):
         self._closing = True
-        self._loop.remove_reader(self._fileno)
+        self._loop._remove_reader(self._fileno)
         self._loop.call_soon(self._call_connection_lost, exc)
 
     def _call_connection_lost(self, exc):
@@ -457,7 +457,7 @@
         # works for pipes and sockets. (Exception: OS X 10.4?  Issue #19294.)
         if is_socket or (is_fifo and not sys.platform.startswith("aix")):
             # only start reading when connection_made() has been called
-            self._loop.call_soon(self._loop.add_reader,
+            self._loop.call_soon(self._loop._add_reader,
                                  self._fileno, self._read_ready)
 
         if waiter is not None:
@@ -530,7 +530,7 @@
                 return
             elif n > 0:
                 data = memoryview(data)[n:]
-            self._loop.add_writer(self._fileno, self._write_ready)
+            self._loop._add_writer(self._fileno, self._write_ready)
 
         self._buffer += data
         self._maybe_pause_protocol()
@@ -547,15 +547,15 @@
             self._conn_lost += 1
             # Remove writer here, _fatal_error() doesn't it
             # because _buffer is empty.
-            self._loop.remove_writer(self._fileno)
+            self._loop._remove_writer(self._fileno)
             self._fatal_error(exc, 'Fatal write error on pipe transport')
         else:
             if n == len(self._buffer):
                 self._buffer.clear()
-                self._loop.remove_writer(self._fileno)
+                self._loop._remove_writer(self._fileno)
                 self._maybe_resume_protocol()  # May append to buffer.
                 if self._closing:
-                    self._loop.remove_reader(self._fileno)
+                    self._loop._remove_reader(self._fileno)
                     self._call_connection_lost(None)
                 return
             elif n > 0:
@@ -570,7 +570,7 @@
         assert self._pipe
         self._closing = True
         if not self._buffer:
-            self._loop.remove_reader(self._fileno)
+            self._loop._remove_reader(self._fileno)
             self._loop.call_soon(self._call_connection_lost, None)
 
     def set_protocol(self, protocol):
@@ -616,9 +616,9 @@
     def _close(self, exc=None):
         self._closing = True
         if self._buffer:
-            self._loop.remove_writer(self._fileno)
+            self._loop._remove_writer(self._fileno)
         self._buffer.clear()
-        self._loop.remove_reader(self._fileno)
+        self._loop._remove_reader(self._fileno)
         self._loop.call_soon(self._call_connection_lost, exc)
 
     def _call_connection_lost(self, exc):
