Issue #7777: socket: Add Reliable Datagram Sockets (PF_RDS) support.
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py
index d7a521c..b5e0dc2 100644
--- a/Lib/test/test_socket.py
+++ b/Lib/test/test_socket.py
@@ -47,8 +47,20 @@
         s.close()
     return True
 
+def _have_socket_rds():
+    """Check whether RDS sockets are supported on this host."""
+    try:
+        s = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
+    except (AttributeError, OSError):
+        return False
+    else:
+        s.close()
+    return True
+
 HAVE_SOCKET_CAN = _have_socket_can()
 
+HAVE_SOCKET_RDS = _have_socket_rds()
+
 # Size in bytes of the int type
 SIZEOF_INT = array.array("i").itemsize
 
@@ -113,6 +125,23 @@
             self.skipTest('network interface `%s` does not exist' %
                            self.interface)
 
+
+class SocketRDSTest(unittest.TestCase):
+
+    """To be able to run this test, the `rds` kernel module must be loaded:
+    # modprobe rds
+    """
+    bufsize = 8192
+
+    def setUp(self):
+        self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
+        self.addCleanup(self.serv.close)
+        try:
+            self.port = support.bind_port(self.serv)
+        except OSError:
+            self.skipTest('unable to bind RDS socket')
+
+
 class ThreadableTest:
     """Threadable Test class
 
@@ -271,6 +300,29 @@
         self.cli = None
         ThreadableTest.clientTearDown(self)
 
+class ThreadedRDSSocketTest(SocketRDSTest, ThreadableTest):
+
+    def __init__(self, methodName='runTest'):
+        SocketRDSTest.__init__(self, methodName=methodName)
+        ThreadableTest.__init__(self)
+        self.evt = threading.Event()
+
+    def clientSetUp(self):
+        self.cli = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
+        try:
+            # RDS sockets must be bound explicitly to send or receive data
+            self.cli.bind((HOST, 0))
+            self.cli_addr = self.cli.getsockname()
+        except OSError:
+            # skipTest should not be called here, and will be called in the
+            # server instead
+            pass
+
+    def clientTearDown(self):
+        self.cli.close()
+        self.cli = None
+        ThreadableTest.clientTearDown(self)
+
 class SocketConnectedTest(ThreadedTCPSocketTest):
     """Socket tests for client-server connection.
 
@@ -1239,6 +1291,112 @@
         self.cli.send(self.cf2)
 
 
+@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
+class BasicRDSTest(unittest.TestCase):
+
+    def testCrucialConstants(self):
+        socket.AF_RDS
+        socket.PF_RDS
+
+    def testCreateSocket(self):
+        with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
+            pass
+
+    def testSocketBufferSize(self):
+        bufsize = 16384
+        with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
+            s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, bufsize)
+            s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, bufsize)
+
+
+@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
+@unittest.skipUnless(thread, 'Threading required for this test.')
+class RDSTest(ThreadedRDSSocketTest):
+
+    def __init__(self, methodName='runTest'):
+        ThreadedRDSSocketTest.__init__(self, methodName=methodName)
+
+    def testSendAndRecv(self):
+        data, addr = self.serv.recvfrom(self.bufsize)
+        self.assertEqual(self.data, data)
+        self.assertEqual(self.cli_addr, addr)
+
+    def _testSendAndRecv(self):
+        self.data = b'spam'
+        self.cli.sendto(self.data, 0, (HOST, self.port))
+
+    def testPeek(self):
+        data, addr = self.serv.recvfrom(self.bufsize, socket.MSG_PEEK)
+        self.assertEqual(self.data, data)
+        data, addr = self.serv.recvfrom(self.bufsize)
+        self.assertEqual(self.data, data)
+
+    def _testPeek(self):
+        self.data = b'spam'
+        self.cli.sendto(self.data, 0, (HOST, self.port))
+
+    @requireAttrs(socket.socket, 'recvmsg')
+    def testSendAndRecvMsg(self):
+        data, ancdata, msg_flags, addr = self.serv.recvmsg(self.bufsize)
+        self.assertEqual(self.data, data)
+
+    @requireAttrs(socket.socket, 'sendmsg')
+    def _testSendAndRecvMsg(self):
+        self.data = b'hello ' * 10
+        self.cli.sendmsg([self.data], (), 0, (HOST, self.port))
+
+    def testSendAndRecvMulti(self):
+        data, addr = self.serv.recvfrom(self.bufsize)
+        self.assertEqual(self.data1, data)
+
+        data, addr = self.serv.recvfrom(self.bufsize)
+        self.assertEqual(self.data2, data)
+
+    def _testSendAndRecvMulti(self):
+        self.data1 = b'bacon'
+        self.cli.sendto(self.data1, 0, (HOST, self.port))
+
+        self.data2 = b'egg'
+        self.cli.sendto(self.data2, 0, (HOST, self.port))
+
+    def testSelect(self):
+        r, w, x = select.select([self.serv], [], [], 3.0)
+        self.assertIn(self.serv, r)
+        data, addr = self.serv.recvfrom(self.bufsize)
+        self.assertEqual(self.data, data)
+
+    def _testSelect(self):
+        self.data = b'select'
+        self.cli.sendto(self.data, 0, (HOST, self.port))
+
+    def testCongestion(self):
+        # wait until the sender is done
+        self.evt.wait()
+
+    def _testCongestion(self):
+        # test the behavior in case of congestion
+        self.data = b'fill'
+        self.cli.setblocking(False)
+        try:
+            # try to lower the receiver's socket buffer size
+            self.cli.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 16384)
+        except OSError:
+            pass
+        with self.assertRaises(OSError) as cm:
+            try:
+                # fill the receiver's socket buffer
+                while True:
+                    self.cli.sendto(self.data, 0, (HOST, self.port))
+            finally:
+                # signal the receiver we're done
+                self.evt.set()
+        # sendto() should have failed with ENOBUFS
+        self.assertEqual(cm.exception.errno, errno.ENOBUFS)
+        # and we should have received a congestion notification through poll
+        r, w, x = select.select([self.serv], [], [], 3.0)
+        self.assertIn(self.serv, r)
+
+
 @unittest.skipUnless(thread, 'Threading required for this test.')
 class BasicTCPTest(SocketConnectedTest):
 
@@ -4362,6 +4520,7 @@
         tests.append(TIPCTest)
         tests.append(TIPCThreadableTest)
     tests.extend([BasicCANTest, CANTest])
+    tests.extend([BasicRDSTest, RDSTest])
     tests.extend([
         CmsgMacroTests,
         SendmsgUDPTest,