Add support for the send/recvmsg API to the socket module. Patch by David Watson and Heiko Wundram. (Closes #6560)
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py
index 4e5085e..bbc9b78 100644
--- a/Lib/test/test_socket.py
+++ b/Lib/test/test_socket.py
@@ -7,6 +7,8 @@
 import io
 import socket
 import select
+import tempfile
+import _testcapi
 import time
 import traceback
 import queue
@@ -34,6 +36,9 @@
     thread = None
     threading = None
 
+# Size in bytes of the int type
+SIZEOF_INT = array.array("i").itemsize
+
 class SocketTCPTest(unittest.TestCase):
 
     def setUp(self):
@@ -55,6 +60,26 @@
         self.serv.close()
         self.serv = None
 
+class ThreadSafeCleanupTestCase(unittest.TestCase):
+    """Subclass of unittest.TestCase with thread-safe cleanup methods.
+
+    This subclass protects the addCleanup() and doCleanups() methods
+    with a recursive lock.
+    """
+
+    if threading:
+        def __init__(self, *args, **kwargs):
+            super().__init__(*args, **kwargs)
+            self._cleanup_lock = threading.RLock()
+
+        def addCleanup(self, *args, **kwargs):
+            with self._cleanup_lock:
+                return super().addCleanup(*args, **kwargs)
+
+        def doCleanups(self, *args, **kwargs):
+            with self._cleanup_lock:
+                return super().doCleanups(*args, **kwargs)
+
 class ThreadableTest:
     """Threadable Test class
 
@@ -237,6 +262,243 @@
         ThreadableTest.clientTearDown(self)
 
 
+# The following classes are used by the sendmsg()/recvmsg() tests.
+# Combining, for instance, ConnectedStreamTestMixin and TCPTestBase
+# gives a drop-in replacement for SocketConnectedTest, but different
+# address families can be used, and the attributes serv_addr and
+# cli_addr will be set to the addresses of the endpoints.
+
+class SocketTestBase(unittest.TestCase):
+    """A base class for socket tests.
+
+    Subclasses must provide methods newSocket() to return a new socket
+    and bindSock(sock) to bind it to an unused address.
+
+    Creates a socket self.serv and sets self.serv_addr to its address.
+    """
+
+    def setUp(self):
+        self.serv = self.newSocket()
+        self.bindServer()
+
+    def bindServer(self):
+        """Bind server socket and set self.serv_addr to its address."""
+        self.bindSock(self.serv)
+        self.serv_addr = self.serv.getsockname()
+
+    def tearDown(self):
+        self.serv.close()
+        self.serv = None
+
+
+class SocketListeningTestMixin(SocketTestBase):
+    """Mixin to listen on the server socket."""
+
+    def setUp(self):
+        super().setUp()
+        self.serv.listen(1)
+
+
+class ThreadedSocketTestMixin(ThreadSafeCleanupTestCase, SocketTestBase,
+                              ThreadableTest):
+    """Mixin to add client socket and allow client/server tests.
+
+    Client socket is self.cli and its address is self.cli_addr.  See
+    ThreadableTest for usage information.
+    """
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        ThreadableTest.__init__(self)
+
+    def clientSetUp(self):
+        self.cli = self.newClientSocket()
+        self.bindClient()
+
+    def newClientSocket(self):
+        """Return a new socket for use as client."""
+        return self.newSocket()
+
+    def bindClient(self):
+        """Bind client socket and set self.cli_addr to its address."""
+        self.bindSock(self.cli)
+        self.cli_addr = self.cli.getsockname()
+
+    def clientTearDown(self):
+        self.cli.close()
+        self.cli = None
+        ThreadableTest.clientTearDown(self)
+
+
+class ConnectedStreamTestMixin(SocketListeningTestMixin,
+                               ThreadedSocketTestMixin):
+    """Mixin to allow client/server stream tests with connected client.
+
+    Server's socket representing connection to client is self.cli_conn
+    and client's connection to server is self.serv_conn.  (Based on
+    SocketConnectedTest.)
+    """
+
+    def setUp(self):
+        super().setUp()
+        # Indicate explicitly we're ready for the client thread to
+        # proceed and then perform the blocking call to accept
+        self.serverExplicitReady()
+        conn, addr = self.serv.accept()
+        self.cli_conn = conn
+
+    def tearDown(self):
+        self.cli_conn.close()
+        self.cli_conn = None
+        super().tearDown()
+
+    def clientSetUp(self):
+        super().clientSetUp()
+        self.cli.connect(self.serv_addr)
+        self.serv_conn = self.cli
+
+    def clientTearDown(self):
+        self.serv_conn.close()
+        self.serv_conn = None
+        super().clientTearDown()
+
+
+class UnixSocketTestBase(SocketTestBase):
+    """Base class for Unix-domain socket tests."""
+
+    # This class is used for file descriptor passing tests, so we
+    # create the sockets in a private directory so that other users
+    # can't send anything that might be problematic for a privileged
+    # user running the tests.
+
+    def setUp(self):
+        self.dir_path = tempfile.mkdtemp()
+        self.addCleanup(os.rmdir, self.dir_path)
+        super().setUp()
+
+    def bindSock(self, sock):
+        path = tempfile.mktemp(dir=self.dir_path)
+        sock.bind(path)
+        self.addCleanup(support.unlink, path)
+
+class UnixStreamBase(UnixSocketTestBase):
+    """Base class for Unix-domain SOCK_STREAM tests."""
+
+    def newSocket(self):
+        return socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+
+
+class InetTestBase(SocketTestBase):
+    """Base class for IPv4 socket tests."""
+
+    host = HOST
+
+    def setUp(self):
+        super().setUp()
+        self.port = self.serv_addr[1]
+
+    def bindSock(self, sock):
+        support.bind_port(sock, host=self.host)
+
+class TCPTestBase(InetTestBase):
+    """Base class for TCP-over-IPv4 tests."""
+
+    def newSocket(self):
+        return socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+
+class UDPTestBase(InetTestBase):
+    """Base class for UDP-over-IPv4 tests."""
+
+    def newSocket(self):
+        return socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+
+class SCTPStreamBase(InetTestBase):
+    """Base class for SCTP tests in one-to-one (SOCK_STREAM) mode."""
+
+    def newSocket(self):
+        return socket.socket(socket.AF_INET, socket.SOCK_STREAM,
+                             socket.IPPROTO_SCTP)
+
+
+class Inet6TestBase(InetTestBase):
+    """Base class for IPv6 socket tests."""
+
+    # Don't use "localhost" here - it may not have an IPv6 address
+    # assigned to it by default (e.g. in /etc/hosts), and if someone
+    # has assigned it an IPv4-mapped address, then it's unlikely to
+    # work with the full IPv6 API.
+    host = "::1"
+
+class UDP6TestBase(Inet6TestBase):
+    """Base class for UDP-over-IPv6 tests."""
+
+    def newSocket(self):
+        return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
+
+
+# Test-skipping decorators for use with ThreadableTest.
+
+def skipWithClientIf(condition, reason):
+    """Skip decorated test if condition is true, add client_skip decorator.
+
+    If the decorated object is not a class, sets its attribute
+    "client_skip" to a decorator which will return an empty function
+    if the test is to be skipped, or the original function if it is
+    not.  This can be used to avoid running the client part of a
+    skipped test when using ThreadableTest.
+    """
+    def client_pass(*args, **kwargs):
+        pass
+    def skipdec(obj):
+        retval = unittest.skip(reason)(obj)
+        if not isinstance(obj, type):
+            retval.client_skip = lambda f: client_pass
+        return retval
+    def noskipdec(obj):
+        if not (isinstance(obj, type) or hasattr(obj, "client_skip")):
+            obj.client_skip = lambda f: f
+        return obj
+    return skipdec if condition else noskipdec
+
+
+def requireAttrs(obj, *attributes):
+    """Skip decorated test if obj is missing any of the given attributes.
+
+    Sets client_skip attribute as skipWithClientIf() does.
+    """
+    missing = [name for name in attributes if not hasattr(obj, name)]
+    return skipWithClientIf(
+        missing, "don't have " + ", ".join(name for name in missing))
+
+
+def requireSocket(*args):
+    """Skip decorated test if a socket cannot be created with given arguments.
+
+    When an argument is given as a string, will use the value of that
+    attribute of the socket module, or skip the test if it doesn't
+    exist.  Sets client_skip attribute as skipWithClientIf() does.
+    """
+    err = None
+    missing = [obj for obj in args if
+               isinstance(obj, str) and not hasattr(socket, obj)]
+    if missing:
+        err = "don't have " + ", ".join(name for name in missing)
+    else:
+        callargs = [getattr(socket, obj) if isinstance(obj, str) else obj
+                    for obj in args]
+        try:
+            s = socket.socket(*callargs)
+        except socket.error as e:
+            # XXX: check errno?
+            err = str(e)
+        else:
+            s.close()
+    return skipWithClientIf(
+        err is not None,
+        "can't create socket({0}): {1}".format(
+            ", ".join(str(o) for o in args), err))
+
+
 #######################################################################
 ## Begin Tests
 
@@ -945,6 +1207,1839 @@
     def _testRecvFromNegative(self):
         self.cli.sendto(MSG, 0, (HOST, self.port))
 
+
+# Tests for the sendmsg()/recvmsg() interface.  Where possible, the
+# same test code is used with different families and types of socket
+# (e.g. stream, datagram), and tests using recvmsg() are repeated
+# using recvmsg_into().
+#
+# The generic test classes such as SendmsgTests and
+# RecvmsgGenericTests inherit from SendrecvmsgBase and expect to be
+# supplied with sockets cli_sock and serv_sock representing the
+# client's and the server's end of the connection respectively, and
+# attributes cli_addr and serv_addr holding their (numeric where
+# appropriate) addresses.
+#
+# The final concrete test classes combine these with subclasses of
+# SocketTestBase which set up client and server sockets of a specific
+# type, and with subclasses of SendrecvmsgBase such as
+# SendrecvmsgDgramBase and SendrecvmsgConnectedBase which map these
+# sockets to cli_sock and serv_sock and override the methods and
+# attributes of SendrecvmsgBase to fill in destination addresses if
+# needed when sending, check for specific flags in msg_flags, etc.
+#
+# RecvmsgIntoMixin provides a version of doRecvmsg() implemented using
+# recvmsg_into().
+
+# XXX: like the other datagram (UDP) tests in this module, the code
+# here assumes that datagram delivery on the local machine will be
+# reliable.
+
+class SendrecvmsgBase(ThreadSafeCleanupTestCase):
+    # Base class for sendmsg()/recvmsg() tests.
+
+    # Time in seconds to wait before considering a test failed, or
+    # None for no timeout.  Not all tests actually set a timeout.
+    fail_timeout = 3.0
+
+    def setUp(self):
+        self.misc_event = threading.Event()
+        super().setUp()
+
+    def sendToServer(self, msg):
+        # Send msg to the server.
+        return self.cli_sock.send(msg)
+
+    # Tuple of alternative default arguments for sendmsg() when called
+    # via sendmsgToServer() (e.g. to include a destination address).
+    sendmsg_to_server_defaults = ()
+
+    def sendmsgToServer(self, *args):
+        # Call sendmsg() on self.cli_sock with the given arguments,
+        # filling in any arguments which are not supplied with the
+        # corresponding items of self.sendmsg_to_server_defaults, if
+        # any.
+        return self.cli_sock.sendmsg(
+            *(args + self.sendmsg_to_server_defaults[len(args):]))
+
+    def doRecvmsg(self, sock, bufsize, *args):
+        # Call recvmsg() on sock with given arguments and return its
+        # result.  Should be used for tests which can use either
+        # recvmsg() or recvmsg_into() - RecvmsgIntoMixin overrides
+        # this method with one which emulates it using recvmsg_into(),
+        # thus allowing the same test to be used for both methods.
+        result = sock.recvmsg(bufsize, *args)
+        self.registerRecvmsgResult(result)
+        return result
+
+    def registerRecvmsgResult(self, result):
+        # Called by doRecvmsg() with the return value of recvmsg() or
+        # recvmsg_into().  Can be overridden to arrange cleanup based
+        # on the returned ancillary data, for instance.
+        pass
+
+    def checkRecvmsgAddress(self, addr1, addr2):
+        # Called to compare the received address with the address of
+        # the peer.
+        self.assertEqual(addr1, addr2)
+
+    # Flags that are normally unset in msg_flags
+    msg_flags_common_unset = 0
+    for name in ("MSG_CTRUNC", "MSG_OOB"):
+        msg_flags_common_unset |= getattr(socket, name, 0)
+
+    # Flags that are normally set
+    msg_flags_common_set = 0
+
+    # Flags set when a complete record has been received (e.g. MSG_EOR
+    # for SCTP)
+    msg_flags_eor_indicator = 0
+
+    # Flags set when a complete record has not been received
+    # (e.g. MSG_TRUNC for datagram sockets)
+    msg_flags_non_eor_indicator = 0
+
+    def checkFlags(self, flags, eor=None, checkset=0, checkunset=0, ignore=0):
+        # Method to check the value of msg_flags returned by recvmsg[_into]().
+        #
+        # Checks that all bits in msg_flags_common_set attribute are
+        # set in "flags" and all bits in msg_flags_common_unset are
+        # unset.
+        #
+        # The "eor" argument specifies whether the flags should
+        # indicate that a full record (or datagram) has been received.
+        # If "eor" is None, no checks are done; otherwise, checks
+        # that:
+        #
+        #  * if "eor" is true, all bits in msg_flags_eor_indicator are
+        #    set and all bits in msg_flags_non_eor_indicator are unset
+        #
+        #  * if "eor" is false, all bits in msg_flags_non_eor_indicator
+        #    are set and all bits in msg_flags_eor_indicator are unset
+        #
+        # If "checkset" and/or "checkunset" are supplied, they require
+        # the given bits to be set or unset respectively, overriding
+        # what the attributes require for those bits.
+        #
+        # If any bits are set in "ignore", they will not be checked,
+        # regardless of the other inputs.
+        #
+        # Will raise Exception if the inputs require a bit to be both
+        # set and unset, and it is not ignored.
+
+        defaultset = self.msg_flags_common_set
+        defaultunset = self.msg_flags_common_unset
+
+        if eor:
+            defaultset |= self.msg_flags_eor_indicator
+            defaultunset |= self.msg_flags_non_eor_indicator
+        elif eor is not None:
+            defaultset |= self.msg_flags_non_eor_indicator
+            defaultunset |= self.msg_flags_eor_indicator
+
+        # Function arguments override defaults
+        defaultset &= ~checkunset
+        defaultunset &= ~checkset
+
+        # Merge arguments with remaining defaults, and check for conflicts
+        checkset |= defaultset
+        checkunset |= defaultunset
+        inboth = checkset & checkunset & ~ignore
+        if inboth:
+            raise Exception("contradictory set, unset requirements for flags "
+                            "{0:#x}".format(inboth))
+
+        # Compare with given msg_flags value
+        mask = (checkset | checkunset) & ~ignore
+        self.assertEqual(flags & mask, checkset & mask)
+
+
+class RecvmsgIntoMixin(SendrecvmsgBase):
+    # Mixin to implement doRecvmsg() using recvmsg_into().
+
+    def doRecvmsg(self, sock, bufsize, *args):
+        buf = bytearray(bufsize)
+        result = sock.recvmsg_into([buf], *args)
+        self.registerRecvmsgResult(result)
+        self.assertGreaterEqual(result[0], 0)
+        self.assertLessEqual(result[0], bufsize)
+        return (bytes(buf[:result[0]]),) + result[1:]
+
+
+class SendrecvmsgDgramFlagsBase(SendrecvmsgBase):
+    # Defines flags to be checked in msg_flags for datagram sockets.
+
+    @property
+    def msg_flags_non_eor_indicator(self):
+        return super().msg_flags_non_eor_indicator | socket.MSG_TRUNC
+
+
+class SendrecvmsgSCTPFlagsBase(SendrecvmsgBase):
+    # Defines flags to be checked in msg_flags for SCTP sockets.
+
+    @property
+    def msg_flags_eor_indicator(self):
+        return super().msg_flags_eor_indicator | socket.MSG_EOR
+
+
+class SendrecvmsgConnectionlessBase(SendrecvmsgBase):
+    # Base class for tests on connectionless-mode sockets.  Users must
+    # supply sockets on attributes cli and serv to be mapped to
+    # cli_sock and serv_sock respectively.
+
+    @property
+    def serv_sock(self):
+        return self.serv
+
+    @property
+    def cli_sock(self):
+        return self.cli
+
+    @property
+    def sendmsg_to_server_defaults(self):
+        return ([], [], 0, self.serv_addr)
+
+    def sendToServer(self, msg):
+        return self.cli_sock.sendto(msg, self.serv_addr)
+
+
+class SendrecvmsgConnectedBase(SendrecvmsgBase):
+    # Base class for tests on connected sockets.  Users must supply
+    # sockets on attributes serv_conn and cli_conn (representing the
+    # connections *to* the server and the client), to be mapped to
+    # cli_sock and serv_sock respectively.
+
+    @property
+    def serv_sock(self):
+        return self.cli_conn
+
+    @property
+    def cli_sock(self):
+        return self.serv_conn
+
+    def checkRecvmsgAddress(self, addr1, addr2):
+        # Address is currently "unspecified" for a connected socket,
+        # so we don't examine it
+        pass
+
+
+class SendrecvmsgServerTimeoutBase(SendrecvmsgBase):
+    # Base class to set a timeout on server's socket.
+
+    def setUp(self):
+        super().setUp()
+        self.serv_sock.settimeout(self.fail_timeout)
+
+
+class SendmsgTests(SendrecvmsgServerTimeoutBase):
+    # Tests for sendmsg() which can use any socket type and do not
+    # involve recvmsg() or recvmsg_into().
+
+    def testSendmsg(self):
+        # Send a simple message with sendmsg().
+        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
+
+    def _testSendmsg(self):
+        self.assertEqual(self.sendmsgToServer([MSG]), len(MSG))
+
+    def testSendmsgDataGenerator(self):
+        # Send from buffer obtained from a generator (not a sequence).
+        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
+
+    def _testSendmsgDataGenerator(self):
+        self.assertEqual(self.sendmsgToServer((o for o in [MSG])),
+                         len(MSG))
+
+    def testSendmsgAncillaryGenerator(self):
+        # Gather (empty) ancillary data from a generator.
+        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
+
+    def _testSendmsgAncillaryGenerator(self):
+        self.assertEqual(self.sendmsgToServer([MSG], (o for o in [])),
+                         len(MSG))
+
+    def testSendmsgArray(self):
+        # Send data from an array instead of the usual bytes object.
+        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
+
+    def _testSendmsgArray(self):
+        self.assertEqual(self.sendmsgToServer([array.array("B", MSG)]),
+                         len(MSG))
+
+    def testSendmsgGather(self):
+        # Send message data from more than one buffer (gather write).
+        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
+
+    def _testSendmsgGather(self):
+        self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG))
+
+    def testSendmsgBadArgs(self):
+        # Check that sendmsg() rejects invalid arguments.
+        self.assertEqual(self.serv_sock.recv(1000), b"done")
+
+    def _testSendmsgBadArgs(self):
+        self.assertRaises(TypeError, self.cli_sock.sendmsg)
+        self.assertRaises(TypeError, self.sendmsgToServer,
+                          b"not in an iterable")
+        self.assertRaises(TypeError, self.sendmsgToServer,
+                          object())
+        self.assertRaises(TypeError, self.sendmsgToServer,
+                          [object()])
+        self.assertRaises(TypeError, self.sendmsgToServer,
+                          [MSG, object()])
+        self.assertRaises(TypeError, self.sendmsgToServer,
+                          [MSG], object())
+        self.assertRaises(TypeError, self.sendmsgToServer,
+                          [MSG], [], object())
+        self.assertRaises(TypeError, self.sendmsgToServer,
+                          [MSG], [], 0, object())
+        self.sendToServer(b"done")
+
+    def testSendmsgBadCmsg(self):
+        # Check that invalid ancillary data items are rejected.
+        self.assertEqual(self.serv_sock.recv(1000), b"done")
+
+    def _testSendmsgBadCmsg(self):
+        self.assertRaises(TypeError, self.sendmsgToServer,
+                          [MSG], [object()])
+        self.assertRaises(TypeError, self.sendmsgToServer,
+                          [MSG], [(object(), 0, b"data")])
+        self.assertRaises(TypeError, self.sendmsgToServer,
+                          [MSG], [(0, object(), b"data")])
+        self.assertRaises(TypeError, self.sendmsgToServer,
+                          [MSG], [(0, 0, object())])
+        self.assertRaises(TypeError, self.sendmsgToServer,
+                          [MSG], [(0, 0)])
+        self.assertRaises(TypeError, self.sendmsgToServer,
+                          [MSG], [(0, 0, b"data", 42)])
+        self.sendToServer(b"done")
+
+    @requireAttrs(socket, "CMSG_SPACE")
+    def testSendmsgBadMultiCmsg(self):
+        # Check that invalid ancillary data items are rejected when
+        # more than one item is present.
+        self.assertEqual(self.serv_sock.recv(1000), b"done")
+
+    @testSendmsgBadMultiCmsg.client_skip
+    def _testSendmsgBadMultiCmsg(self):
+        self.assertRaises(TypeError, self.sendmsgToServer,
+                          [MSG], [0, 0, b""])
+        self.assertRaises(TypeError, self.sendmsgToServer,
+                          [MSG], [(0, 0, b""), object()])
+        self.sendToServer(b"done")
+
+    def testSendmsgExcessCmsgReject(self):
+        # Check that sendmsg() rejects excess ancillary data items
+        # when the number that can be sent is limited.
+        self.assertEqual(self.serv_sock.recv(1000), b"done")
+
+    def _testSendmsgExcessCmsgReject(self):
+        if not hasattr(socket, "CMSG_SPACE"):
+            # Can only send one item
+            with self.assertRaises(socket.error) as cm:
+                self.sendmsgToServer([MSG], [(0, 0, b""), (0, 0, b"")])
+            self.assertIsNone(cm.exception.errno)
+        self.sendToServer(b"done")
+
+    def testSendmsgAfterClose(self):
+        # Check that sendmsg() fails on a closed socket.
+        pass
+
+    def _testSendmsgAfterClose(self):
+        self.cli_sock.close()
+        self.assertRaises(socket.error, self.sendmsgToServer, [MSG])
+
+
+class SendmsgStreamTests(SendmsgTests):
+    # Tests for sendmsg() which require a stream socket and do not
+    # involve recvmsg() or recvmsg_into().
+
+    def testSendmsgExplicitNoneAddr(self):
+        # Check that peer address can be specified as None.
+        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
+
+    def _testSendmsgExplicitNoneAddr(self):
+        self.assertEqual(self.sendmsgToServer([MSG], [], 0, None), len(MSG))
+
+    def testSendmsgTimeout(self):
+        # Check that timeout works with sendmsg().
+        self.assertEqual(self.serv_sock.recv(512), b"a"*512)
+        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
+
+    def _testSendmsgTimeout(self):
+        try:
+            self.cli_sock.settimeout(0.03)
+            with self.assertRaises(socket.timeout):
+                while True:
+                    self.sendmsgToServer([b"a"*512])
+        finally:
+            self.misc_event.set()
+
+    # XXX: would be nice to have more tests for sendmsg flags argument.
+
+    # Linux supports MSG_DONTWAIT when sending, but in general, it
+    # only works when receiving.  Could add other platforms if they
+    # support it too.
+    @skipWithClientIf(sys.platform not in {"linux2"},
+                      "MSG_DONTWAIT not known to work on this platform when "
+                      "sending")
+    def testSendmsgDontWait(self):
+        # Check that MSG_DONTWAIT in flags causes non-blocking behaviour.
+        self.assertEqual(self.serv_sock.recv(512), b"a"*512)
+        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
+
+    @testSendmsgDontWait.client_skip
+    def _testSendmsgDontWait(self):
+        try:
+            with self.assertRaises(socket.error) as cm:
+                while True:
+                    self.sendmsgToServer([b"a"*512], [], socket.MSG_DONTWAIT)
+            self.assertIn(cm.exception.errno,
+                          (errno.EAGAIN, errno.EWOULDBLOCK))
+        finally:
+            self.misc_event.set()
+
+
+class SendmsgConnectionlessTests(SendmsgTests):
+    # Tests for sendmsg() which require a connectionless-mode
+    # (e.g. datagram) socket, and do not involve recvmsg() or
+    # recvmsg_into().
+
+    def testSendmsgNoDestAddr(self):
+        # Check that sendmsg() fails when no destination address is
+        # given for unconnected socket.
+        pass
+
+    def _testSendmsgNoDestAddr(self):
+        self.assertRaises(socket.error, self.cli_sock.sendmsg,
+                          [MSG])
+        self.assertRaises(socket.error, self.cli_sock.sendmsg,
+                          [MSG], [], 0, None)
+
+
+class RecvmsgGenericTests(SendrecvmsgBase):
+    # Tests for recvmsg() which can also be emulated using
+    # recvmsg_into(), and can use any socket type.
+
+    def testRecvmsg(self):
+        # Receive a simple message with recvmsg[_into]().
+        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG))
+        self.assertEqual(msg, MSG)
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.assertEqual(ancdata, [])
+        self.checkFlags(flags, eor=True)
+
+    def _testRecvmsg(self):
+        self.sendToServer(MSG)
+
+    def testRecvmsgExplicitDefaults(self):
+        # Test recvmsg[_into]() with default arguments provided explicitly.
+        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
+                                                   len(MSG), 0, 0)
+        self.assertEqual(msg, MSG)
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.assertEqual(ancdata, [])
+        self.checkFlags(flags, eor=True)
+
+    def _testRecvmsgExplicitDefaults(self):
+        self.sendToServer(MSG)
+
+    def testRecvmsgShorter(self):
+        # Receive a message smaller than buffer.
+        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
+                                                   len(MSG) + 42)
+        self.assertEqual(msg, MSG)
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.assertEqual(ancdata, [])
+        self.checkFlags(flags, eor=True)
+
+    def _testRecvmsgShorter(self):
+        self.sendToServer(MSG)
+
+    def testRecvmsgTrunc(self):
+        # Receive part of message, check for truncation indicators.
+        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
+                                                   len(MSG) - 3)
+        self.assertEqual(msg, MSG[:-3])
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.assertEqual(ancdata, [])
+        self.checkFlags(flags, eor=False)
+
+    def _testRecvmsgTrunc(self):
+        self.sendToServer(MSG)
+
+    def testRecvmsgShortAncillaryBuf(self):
+        # Test ancillary data buffer too small to hold any ancillary data.
+        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
+                                                   len(MSG), 1)
+        self.assertEqual(msg, MSG)
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.assertEqual(ancdata, [])
+        self.checkFlags(flags, eor=True)
+
+    def _testRecvmsgShortAncillaryBuf(self):
+        self.sendToServer(MSG)
+
+    def testRecvmsgLongAncillaryBuf(self):
+        # Test large ancillary data buffer.
+        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
+                                                   len(MSG), 10240)
+        self.assertEqual(msg, MSG)
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.assertEqual(ancdata, [])
+        self.checkFlags(flags, eor=True)
+
+    def _testRecvmsgLongAncillaryBuf(self):
+        self.sendToServer(MSG)
+
+    def testRecvmsgAfterClose(self):
+        # Check that recvmsg[_into]() fails on a closed socket.
+        self.serv_sock.close()
+        self.assertRaises(socket.error, self.doRecvmsg, self.serv_sock, 1024)
+
+    def _testRecvmsgAfterClose(self):
+        pass
+
+    def testRecvmsgTimeout(self):
+        # Check that timeout works.
+        try:
+            self.serv_sock.settimeout(0.03)
+            self.assertRaises(socket.timeout,
+                              self.doRecvmsg, self.serv_sock, len(MSG))
+        finally:
+            self.misc_event.set()
+
+    def _testRecvmsgTimeout(self):
+        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
+
+    @requireAttrs(socket, "MSG_PEEK")
+    def testRecvmsgPeek(self):
+        # Check that MSG_PEEK in flags enables examination of pending
+        # data without consuming it.
+
+        # Receive part of data with MSG_PEEK.
+        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
+                                                   len(MSG) - 3, 0,
+                                                   socket.MSG_PEEK)
+        self.assertEqual(msg, MSG[:-3])
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.assertEqual(ancdata, [])
+        # Ignoring MSG_TRUNC here (so this test is the same for stream
+        # and datagram sockets).  Some wording in POSIX seems to
+        # suggest that it needn't be set when peeking, but that may
+        # just be a slip.
+        self.checkFlags(flags, eor=False,
+                        ignore=getattr(socket, "MSG_TRUNC", 0))
+
+        # Receive all data with MSG_PEEK.
+        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
+                                                   len(MSG), 0,
+                                                   socket.MSG_PEEK)
+        self.assertEqual(msg, MSG)
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.assertEqual(ancdata, [])
+        self.checkFlags(flags, eor=True)
+
+        # Check that the same data can still be received normally.
+        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG))
+        self.assertEqual(msg, MSG)
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.assertEqual(ancdata, [])
+        self.checkFlags(flags, eor=True)
+
+    @testRecvmsgPeek.client_skip
+    def _testRecvmsgPeek(self):
+        self.sendToServer(MSG)
+
+    @requireAttrs(socket.socket, "sendmsg")
+    def testRecvmsgFromSendmsg(self):
+        # Test receiving with recvmsg[_into]() when message is sent
+        # using sendmsg().
+        self.serv_sock.settimeout(self.fail_timeout)
+        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG))
+        self.assertEqual(msg, MSG)
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.assertEqual(ancdata, [])
+        self.checkFlags(flags, eor=True)
+
+    @testRecvmsgFromSendmsg.client_skip
+    def _testRecvmsgFromSendmsg(self):
+        self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG))
+
+
+class RecvmsgGenericStreamTests(RecvmsgGenericTests):
+    # Tests which require a stream socket and can use either recvmsg()
+    # or recvmsg_into().
+
+    def testRecvmsgEOF(self):
+        # Receive end-of-stream indicator (b"", peer socket closed).
+        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024)
+        self.assertEqual(msg, b"")
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.assertEqual(ancdata, [])
+        self.checkFlags(flags, eor=None) # Might not have end-of-record marker
+
+    def _testRecvmsgEOF(self):
+        self.cli_sock.close()
+
+    def testRecvmsgOverflow(self):
+        # Receive a message in more than one chunk.
+        seg1, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
+                                                    len(MSG) - 3)
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.assertEqual(ancdata, [])
+        self.checkFlags(flags, eor=False)
+
+        seg2, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024)
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.assertEqual(ancdata, [])
+        self.checkFlags(flags, eor=True)
+
+        msg = seg1 + seg2
+        self.assertEqual(msg, MSG)
+
+    def _testRecvmsgOverflow(self):
+        self.sendToServer(MSG)
+
+
+class RecvmsgTests(RecvmsgGenericTests):
+    # Tests for recvmsg() which can use any socket type.
+
+    def testRecvmsgBadArgs(self):
+        # Check that recvmsg() rejects invalid arguments.
+        self.assertRaises(TypeError, self.serv_sock.recvmsg)
+        self.assertRaises(ValueError, self.serv_sock.recvmsg,
+                          -1, 0, 0)
+        self.assertRaises(ValueError, self.serv_sock.recvmsg,
+                          len(MSG), -1, 0)
+        self.assertRaises(TypeError, self.serv_sock.recvmsg,
+                          [bytearray(10)], 0, 0)
+        self.assertRaises(TypeError, self.serv_sock.recvmsg,
+                          object(), 0, 0)
+        self.assertRaises(TypeError, self.serv_sock.recvmsg,
+                          len(MSG), object(), 0)
+        self.assertRaises(TypeError, self.serv_sock.recvmsg,
+                          len(MSG), 0, object())
+
+        msg, ancdata, flags, addr = self.serv_sock.recvmsg(len(MSG), 0, 0)
+        self.assertEqual(msg, MSG)
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.assertEqual(ancdata, [])
+        self.checkFlags(flags, eor=True)
+
+    def _testRecvmsgBadArgs(self):
+        self.sendToServer(MSG)
+
+
+class RecvmsgIntoTests(RecvmsgIntoMixin, RecvmsgGenericTests):
+    # Tests for recvmsg_into() which can use any socket type.
+
+    def testRecvmsgIntoBadArgs(self):
+        # Check that recvmsg_into() rejects invalid arguments.
+        buf = bytearray(len(MSG))
+        self.assertRaises(TypeError, self.serv_sock.recvmsg_into)
+        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
+                          len(MSG), 0, 0)
+        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
+                          buf, 0, 0)
+        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
+                          [object()], 0, 0)
+        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
+                          [b"I'm not writable"], 0, 0)
+        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
+                          [buf, object()], 0, 0)
+        self.assertRaises(ValueError, self.serv_sock.recvmsg_into,
+                          [buf], -1, 0)
+        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
+                          [buf], object(), 0)
+        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
+                          [buf], 0, object())
+
+        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf], 0, 0)
+        self.assertEqual(nbytes, len(MSG))
+        self.assertEqual(buf, bytearray(MSG))
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.assertEqual(ancdata, [])
+        self.checkFlags(flags, eor=True)
+
+    def _testRecvmsgIntoBadArgs(self):
+        self.sendToServer(MSG)
+
+    def testRecvmsgIntoGenerator(self):
+        # Receive into buffer obtained from a generator (not a sequence).
+        buf = bytearray(len(MSG))
+        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into(
+            (o for o in [buf]))
+        self.assertEqual(nbytes, len(MSG))
+        self.assertEqual(buf, bytearray(MSG))
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.assertEqual(ancdata, [])
+        self.checkFlags(flags, eor=True)
+
+    def _testRecvmsgIntoGenerator(self):
+        self.sendToServer(MSG)
+
+    def testRecvmsgIntoArray(self):
+        # Receive into an array rather than the usual bytearray.
+        buf = array.array("B", [0] * len(MSG))
+        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf])
+        self.assertEqual(nbytes, len(MSG))
+        self.assertEqual(buf.tostring(), MSG)
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.assertEqual(ancdata, [])
+        self.checkFlags(flags, eor=True)
+
+    def _testRecvmsgIntoArray(self):
+        self.sendToServer(MSG)
+
+    def testRecvmsgIntoScatter(self):
+        # Receive into multiple buffers (scatter write).
+        b1 = bytearray(b"----")
+        b2 = bytearray(b"0123456789")
+        b3 = bytearray(b"--------------")
+        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into(
+            [b1, memoryview(b2)[2:9], b3])
+        self.assertEqual(nbytes, len(b"Mary had a little lamb"))
+        self.assertEqual(b1, bytearray(b"Mary"))
+        self.assertEqual(b2, bytearray(b"01 had a 9"))
+        self.assertEqual(b3, bytearray(b"little lamb---"))
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.assertEqual(ancdata, [])
+        self.checkFlags(flags, eor=True)
+
+    def _testRecvmsgIntoScatter(self):
+        self.sendToServer(b"Mary had a little lamb")
+
+
+class CmsgMacroTests(unittest.TestCase):
+    # Test the functions CMSG_LEN() and CMSG_SPACE().  Tests
+    # assumptions used by sendmsg() and recvmsg[_into](), which share
+    # code with these functions.
+
+    # Match the definition in socketmodule.c
+    socklen_t_limit = min(0x7fffffff, _testcapi.INT_MAX)
+
+    @requireAttrs(socket, "CMSG_LEN")
+    def testCMSG_LEN(self):
+        # Test CMSG_LEN() with various valid and invalid values,
+        # checking the assumptions used by recvmsg() and sendmsg().
+        toobig = self.socklen_t_limit - socket.CMSG_LEN(0) + 1
+        values = list(range(257)) + list(range(toobig - 257, toobig))
+
+        # struct cmsghdr has at least three members, two of which are ints
+        self.assertGreater(socket.CMSG_LEN(0), array.array("i").itemsize * 2)
+        for n in values:
+            ret = socket.CMSG_LEN(n)
+            # This is how recvmsg() calculates the data size
+            self.assertEqual(ret - socket.CMSG_LEN(0), n)
+            self.assertLessEqual(ret, self.socklen_t_limit)
+
+        self.assertRaises(OverflowError, socket.CMSG_LEN, -1)
+        # sendmsg() shares code with these functions, and requires
+        # that it reject values over the limit.
+        self.assertRaises(OverflowError, socket.CMSG_LEN, toobig)
+        self.assertRaises(OverflowError, socket.CMSG_LEN, sys.maxsize)
+
+    @requireAttrs(socket, "CMSG_SPACE")
+    def testCMSG_SPACE(self):
+        # Test CMSG_SPACE() with various valid and invalid values,
+        # checking the assumptions used by sendmsg().
+        toobig = self.socklen_t_limit - socket.CMSG_SPACE(1) + 1
+        values = list(range(257)) + list(range(toobig - 257, toobig))
+
+        last = socket.CMSG_SPACE(0)
+        # struct cmsghdr has at least three members, two of which are ints
+        self.assertGreater(last, array.array("i").itemsize * 2)
+        for n in values:
+            ret = socket.CMSG_SPACE(n)
+            self.assertGreaterEqual(ret, last)
+            self.assertGreaterEqual(ret, socket.CMSG_LEN(n))
+            self.assertGreaterEqual(ret, n + socket.CMSG_LEN(0))
+            self.assertLessEqual(ret, self.socklen_t_limit)
+            last = ret
+
+        self.assertRaises(OverflowError, socket.CMSG_SPACE, -1)
+        # sendmsg() shares code with these functions, and requires
+        # that it reject values over the limit.
+        self.assertRaises(OverflowError, socket.CMSG_SPACE, toobig)
+        self.assertRaises(OverflowError, socket.CMSG_SPACE, sys.maxsize)
+
+
+class SCMRightsTest(SendrecvmsgServerTimeoutBase):
+    # Tests for file descriptor passing on Unix-domain sockets.
+
+    # Invalid file descriptor value that's unlikely to evaluate to a
+    # real FD even if one of its bytes is replaced with a different
+    # value (which shouldn't actually happen).
+    badfd = -0x5555
+
+    def newFDs(self, n):
+        # Return a list of n file descriptors for newly-created files
+        # containing their list indices as ASCII numbers.
+        fds = []
+        for i in range(n):
+            fd, path = tempfile.mkstemp()
+            self.addCleanup(os.unlink, path)
+            self.addCleanup(os.close, fd)
+            os.write(fd, str(i).encode())
+            fds.append(fd)
+        return fds
+
+    def checkFDs(self, fds):
+        # Check that the file descriptors in the given list contain
+        # their correct list indices as ASCII numbers.
+        for n, fd in enumerate(fds):
+            os.lseek(fd, 0, os.SEEK_SET)
+            self.assertEqual(os.read(fd, 1024), str(n).encode())
+
+    def registerRecvmsgResult(self, result):
+        self.addCleanup(self.closeRecvmsgFDs, result)
+
+    def closeRecvmsgFDs(self, recvmsg_result):
+        # Close all file descriptors specified in the ancillary data
+        # of the given return value from recvmsg() or recvmsg_into().
+        for cmsg_level, cmsg_type, cmsg_data in recvmsg_result[1]:
+            if (cmsg_level == socket.SOL_SOCKET and
+                    cmsg_type == socket.SCM_RIGHTS):
+                fds = array.array("i")
+                fds.fromstring(cmsg_data[:
+                        len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
+                for fd in fds:
+                    os.close(fd)
+
+    def createAndSendFDs(self, n):
+        # Send n new file descriptors created by newFDs() to the
+        # server, with the constant MSG as the non-ancillary data.
+        self.assertEqual(
+            self.sendmsgToServer([MSG],
+                                 [(socket.SOL_SOCKET,
+                                   socket.SCM_RIGHTS,
+                                   array.array("i", self.newFDs(n)))]),
+            len(MSG))
+
+    def checkRecvmsgFDs(self, numfds, result, maxcmsgs=1, ignoreflags=0):
+        # Check that constant MSG was received with numfds file
+        # descriptors in a maximum of maxcmsgs control messages (which
+        # must contain only complete integers).  By default, check
+        # that MSG_CTRUNC is unset, but ignore any flags in
+        # ignoreflags.
+        msg, ancdata, flags, addr = result
+        self.assertEqual(msg, MSG)
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC,
+                        ignore=ignoreflags)
+
+        self.assertIsInstance(ancdata, list)
+        self.assertLessEqual(len(ancdata), maxcmsgs)
+        fds = array.array("i")
+        for item in ancdata:
+            self.assertIsInstance(item, tuple)
+            cmsg_level, cmsg_type, cmsg_data = item
+            self.assertEqual(cmsg_level, socket.SOL_SOCKET)
+            self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
+            self.assertIsInstance(cmsg_data, bytes)
+            self.assertEqual(len(cmsg_data) % SIZEOF_INT, 0)
+            fds.fromstring(cmsg_data)
+
+        self.assertEqual(len(fds), numfds)
+        self.checkFDs(fds)
+
+    def testFDPassSimple(self):
+        # Pass a single FD (array read from bytes object).
+        self.checkRecvmsgFDs(1, self.doRecvmsg(self.serv_sock,
+                                               len(MSG), 10240))
+
+    def _testFDPassSimple(self):
+        self.assertEqual(
+            self.sendmsgToServer(
+                [MSG],
+                [(socket.SOL_SOCKET,
+                  socket.SCM_RIGHTS,
+                  array.array("i", self.newFDs(1)).tostring())]),
+            len(MSG))
+
+    def testMultipleFDPass(self):
+        # Pass multiple FDs in a single array.
+        self.checkRecvmsgFDs(4, self.doRecvmsg(self.serv_sock,
+                                               len(MSG), 10240))
+
+    def _testMultipleFDPass(self):
+        self.createAndSendFDs(4)
+
+    @requireAttrs(socket, "CMSG_SPACE")
+    def testFDPassCMSG_SPACE(self):
+        # Test using CMSG_SPACE() to calculate ancillary buffer size.
+        self.checkRecvmsgFDs(
+            4, self.doRecvmsg(self.serv_sock, len(MSG),
+                              socket.CMSG_SPACE(4 * SIZEOF_INT)))
+
+    @testFDPassCMSG_SPACE.client_skip
+    def _testFDPassCMSG_SPACE(self):
+        self.createAndSendFDs(4)
+
+    def testFDPassCMSG_LEN(self):
+        # Test using CMSG_LEN() to calculate ancillary buffer size.
+        self.checkRecvmsgFDs(1,
+                             self.doRecvmsg(self.serv_sock, len(MSG),
+                                            socket.CMSG_LEN(4 * SIZEOF_INT)),
+                             # RFC 3542 says implementations may set
+                             # MSG_CTRUNC if there isn't enough space
+                             # for trailing padding.
+                             ignoreflags=socket.MSG_CTRUNC)
+
+    def _testFDPassCMSG_LEN(self):
+        self.createAndSendFDs(1)
+
+    @requireAttrs(socket, "CMSG_SPACE")
+    def testFDPassSeparate(self):
+        # Pass two FDs in two separate arrays.  Arrays may be combined
+        # into a single control message by the OS.
+        self.checkRecvmsgFDs(2,
+                             self.doRecvmsg(self.serv_sock, len(MSG), 10240),
+                             maxcmsgs=2)
+
+    @testFDPassSeparate.client_skip
+    def _testFDPassSeparate(self):
+        fd0, fd1 = self.newFDs(2)
+        self.assertEqual(
+            self.sendmsgToServer([MSG], [(socket.SOL_SOCKET,
+                                          socket.SCM_RIGHTS,
+                                          array.array("i", [fd0])),
+                                         (socket.SOL_SOCKET,
+                                          socket.SCM_RIGHTS,
+                                          array.array("i", [fd1]))]),
+            len(MSG))
+
+    @requireAttrs(socket, "CMSG_SPACE")
+    def testFDPassSeparateMinSpace(self):
+        # Pass two FDs in two separate arrays, receiving them into the
+        # minimum space for two arrays.
+        self.checkRecvmsgFDs(2,
+                             self.doRecvmsg(self.serv_sock, len(MSG),
+                                            socket.CMSG_SPACE(SIZEOF_INT) +
+                                            socket.CMSG_LEN(SIZEOF_INT)),
+                             maxcmsgs=2, ignoreflags=socket.MSG_CTRUNC)
+
+    @testFDPassSeparateMinSpace.client_skip
+    def _testFDPassSeparateMinSpace(self):
+        fd0, fd1 = self.newFDs(2)
+        self.assertEqual(
+            self.sendmsgToServer([MSG], [(socket.SOL_SOCKET,
+                                          socket.SCM_RIGHTS,
+                                          array.array("i", [fd0])),
+                                         (socket.SOL_SOCKET,
+                                          socket.SCM_RIGHTS,
+                                          array.array("i", [fd1]))]),
+            len(MSG))
+
+    def sendAncillaryIfPossible(self, msg, ancdata):
+        # Try to send msg and ancdata to server, but if the system
+        # call fails, just send msg with no ancillary data.
+        try:
+            nbytes = self.sendmsgToServer([msg], ancdata)
+        except socket.error as e:
+            # Check that it was the system call that failed
+            self.assertIsInstance(e.errno, int)
+            nbytes = self.sendmsgToServer([msg])
+        self.assertEqual(nbytes, len(msg))
+
+    def testFDPassEmpty(self):
+        # Try to pass an empty FD array.  Can receive either no array
+        # or an empty array.
+        self.checkRecvmsgFDs(0, self.doRecvmsg(self.serv_sock,
+                                               len(MSG), 10240),
+                             ignoreflags=socket.MSG_CTRUNC)
+
+    def _testFDPassEmpty(self):
+        self.sendAncillaryIfPossible(MSG, [(socket.SOL_SOCKET,
+                                            socket.SCM_RIGHTS,
+                                            b"")])
+
+    def testFDPassPartialInt(self):
+        # Try to pass a truncated FD array.
+        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
+                                                   len(MSG), 10240)
+        self.assertEqual(msg, MSG)
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC)
+        self.assertLessEqual(len(ancdata), 1)
+        for cmsg_level, cmsg_type, cmsg_data in ancdata:
+            self.assertEqual(cmsg_level, socket.SOL_SOCKET)
+            self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
+            self.assertLess(len(cmsg_data), SIZEOF_INT)
+
+    def _testFDPassPartialInt(self):
+        self.sendAncillaryIfPossible(
+            MSG,
+            [(socket.SOL_SOCKET,
+              socket.SCM_RIGHTS,
+              array.array("i", [self.badfd]).tostring()[:-1])])
+
+    @requireAttrs(socket, "CMSG_SPACE")
+    def testFDPassPartialIntInMiddle(self):
+        # Try to pass two FD arrays, the first of which is truncated.
+        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
+                                                   len(MSG), 10240)
+        self.assertEqual(msg, MSG)
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC)
+        self.assertLessEqual(len(ancdata), 2)
+        fds = array.array("i")
+        # Arrays may have been combined in a single control message
+        for cmsg_level, cmsg_type, cmsg_data in ancdata:
+            self.assertEqual(cmsg_level, socket.SOL_SOCKET)
+            self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
+            fds.fromstring(cmsg_data[:
+                    len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
+        self.assertLessEqual(len(fds), 2)
+        self.checkFDs(fds)
+
+    @testFDPassPartialIntInMiddle.client_skip
+    def _testFDPassPartialIntInMiddle(self):
+        fd0, fd1 = self.newFDs(2)
+        self.sendAncillaryIfPossible(
+            MSG,
+            [(socket.SOL_SOCKET,
+              socket.SCM_RIGHTS,
+              array.array("i", [fd0, self.badfd]).tostring()[:-1]),
+             (socket.SOL_SOCKET,
+              socket.SCM_RIGHTS,
+              array.array("i", [fd1]))])
+
+    def checkTruncatedHeader(self, result, ignoreflags=0):
+        # Check that no ancillary data items are returned when data is
+        # truncated inside the cmsghdr structure.
+        msg, ancdata, flags, addr = result
+        self.assertEqual(msg, MSG)
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.assertEqual(ancdata, [])
+        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC,
+                        ignore=ignoreflags)
+
+    def testCmsgTruncNoBufSize(self):
+        # Check that no ancillary data is received when no buffer size
+        # is specified.
+        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG)),
+                                  # BSD seems to set MSG_CTRUNC only
+                                  # if an item has been partially
+                                  # received.
+                                  ignoreflags=socket.MSG_CTRUNC)
+
+    def _testCmsgTruncNoBufSize(self):
+        self.createAndSendFDs(1)
+
+    def testCmsgTrunc0(self):
+        # Check that no ancillary data is received when buffer size is 0.
+        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 0),
+                                  ignoreflags=socket.MSG_CTRUNC)
+
+    def _testCmsgTrunc0(self):
+        self.createAndSendFDs(1)
+
+    # Check that no ancillary data is returned for various non-zero
+    # (but still too small) buffer sizes.
+
+    def testCmsgTrunc1(self):
+        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 1))
+
+    def _testCmsgTrunc1(self):
+        self.createAndSendFDs(1)
+
+    def testCmsgTrunc2Int(self):
+        # The cmsghdr structure has at least three members, two of
+        # which are ints, so we still shouldn't see any ancillary
+        # data.
+        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG),
+                                                 SIZEOF_INT * 2))
+
+    def _testCmsgTrunc2Int(self):
+        self.createAndSendFDs(1)
+
+    def testCmsgTruncLen0Minus1(self):
+        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG),
+                                                 socket.CMSG_LEN(0) - 1))
+
+    def _testCmsgTruncLen0Minus1(self):
+        self.createAndSendFDs(1)
+
+    # The following tests try to truncate the control message in the
+    # middle of the FD array.
+
+    def checkTruncatedArray(self, ancbuf, maxdata, mindata=0):
+        # Check that file descriptor data is truncated to between
+        # mindata and maxdata bytes when received with buffer size
+        # ancbuf, and that any complete file descriptor numbers are
+        # valid.
+        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
+                                                   len(MSG), ancbuf)
+        self.assertEqual(msg, MSG)
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
+
+        if mindata == 0 and ancdata == []:
+            return
+        self.assertEqual(len(ancdata), 1)
+        cmsg_level, cmsg_type, cmsg_data = ancdata[0]
+        self.assertEqual(cmsg_level, socket.SOL_SOCKET)
+        self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
+        self.assertGreaterEqual(len(cmsg_data), mindata)
+        self.assertLessEqual(len(cmsg_data), maxdata)
+        fds = array.array("i")
+        fds.fromstring(cmsg_data[:
+                len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
+        self.checkFDs(fds)
+
+    def testCmsgTruncLen0(self):
+        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0), maxdata=0)
+
+    def _testCmsgTruncLen0(self):
+        self.createAndSendFDs(1)
+
+    def testCmsgTruncLen0Plus1(self):
+        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0) + 1, maxdata=1)
+
+    def _testCmsgTruncLen0Plus1(self):
+        self.createAndSendFDs(2)
+
+    def testCmsgTruncLen1(self):
+        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(SIZEOF_INT),
+                                 maxdata=SIZEOF_INT)
+
+    def _testCmsgTruncLen1(self):
+        self.createAndSendFDs(2)
+
+    def testCmsgTruncLen2Minus1(self):
+        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(2 * SIZEOF_INT) - 1,
+                                 maxdata=(2 * SIZEOF_INT) - 1)
+
+    def _testCmsgTruncLen2Minus1(self):
+        self.createAndSendFDs(2)
+
+
+class RFC3542AncillaryTest(SendrecvmsgServerTimeoutBase):
+    # Test sendmsg() and recvmsg[_into]() using the ancillary data
+    # features of the RFC 3542 Advanced Sockets API for IPv6.
+    # Currently we can only handle certain data items (e.g. traffic
+    # class, hop limit, MTU discovery and fragmentation settings)
+    # without resorting to unportable means such as the struct module,
+    # but the tests here are aimed at testing the ancillary data
+    # handling in sendmsg() and recvmsg() rather than the IPv6 API
+    # itself.
+
+    # Test value to use when setting hop limit of packet
+    hop_limit = 2
+
+    # Test value to use when setting traffic class of packet.
+    # -1 means "use kernel default".
+    traffic_class = -1
+
+    def ancillaryMapping(self, ancdata):
+        # Given ancillary data list ancdata, return a mapping from
+        # pairs (cmsg_level, cmsg_type) to corresponding cmsg_data.
+        # Check that no (level, type) pair appears more than once.
+        d = {}
+        for cmsg_level, cmsg_type, cmsg_data in ancdata:
+            self.assertNotIn((cmsg_level, cmsg_type), d)
+            d[(cmsg_level, cmsg_type)] = cmsg_data
+        return d
+
+    def checkHopLimit(self, ancbufsize, maxhop=255, ignoreflags=0):
+        # Receive hop limit into ancbufsize bytes of ancillary data
+        # space.  Check that data is MSG, ancillary data is not
+        # truncated (but ignore any flags in ignoreflags), and hop
+        # limit is between 0 and maxhop inclusive.
+        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
+                                  socket.IPV6_RECVHOPLIMIT, 1)
+        self.misc_event.set()
+        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
+                                                   len(MSG), ancbufsize)
+
+        self.assertEqual(msg, MSG)
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC,
+                        ignore=ignoreflags)
+
+        self.assertEqual(len(ancdata), 1)
+        self.assertIsInstance(ancdata[0], tuple)
+        cmsg_level, cmsg_type, cmsg_data = ancdata[0]
+        self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
+        self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT)
+        self.assertIsInstance(cmsg_data, bytes)
+        self.assertEqual(len(cmsg_data), SIZEOF_INT)
+        a = array.array("i")
+        a.fromstring(cmsg_data)
+        self.assertGreaterEqual(a[0], 0)
+        self.assertLessEqual(a[0], maxhop)
+
+    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
+    def testRecvHopLimit(self):
+        # Test receiving the packet hop limit as ancillary data.
+        self.checkHopLimit(ancbufsize=10240)
+
+    @testRecvHopLimit.client_skip
+    def _testRecvHopLimit(self):
+        # Need to wait until server has asked to receive ancillary
+        # data, as implementations are not required to buffer it
+        # otherwise.
+        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
+        self.sendToServer(MSG)
+
+    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
+    def testRecvHopLimitCMSG_SPACE(self):
+        # Test receiving hop limit, using CMSG_SPACE to calculate buffer size.
+        self.checkHopLimit(ancbufsize=socket.CMSG_SPACE(SIZEOF_INT))
+
+    @testRecvHopLimitCMSG_SPACE.client_skip
+    def _testRecvHopLimitCMSG_SPACE(self):
+        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
+        self.sendToServer(MSG)
+
+    # Could test receiving into buffer sized using CMSG_LEN, but RFC
+    # 3542 says portable applications must provide space for trailing
+    # padding.  Implementations may set MSG_CTRUNC if there isn't
+    # enough space for the padding.
+
+    @requireAttrs(socket.socket, "sendmsg")
+    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
+    def testSetHopLimit(self):
+        # Test setting hop limit on outgoing packet and receiving it
+        # at the other end.
+        self.checkHopLimit(ancbufsize=10240, maxhop=self.hop_limit)
+
+    @testSetHopLimit.client_skip
+    def _testSetHopLimit(self):
+        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
+        self.assertEqual(
+            self.sendmsgToServer([MSG],
+                                 [(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
+                                   array.array("i", [self.hop_limit]))]),
+            len(MSG))
+
+    def checkTrafficClassAndHopLimit(self, ancbufsize, maxhop=255,
+                                     ignoreflags=0):
+        # Receive traffic class and hop limit into ancbufsize bytes of
+        # ancillary data space.  Check that data is MSG, ancillary
+        # data is not truncated (but ignore any flags in ignoreflags),
+        # and traffic class and hop limit are in range (hop limit no
+        # more than maxhop).
+        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
+                                  socket.IPV6_RECVHOPLIMIT, 1)
+        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
+                                  socket.IPV6_RECVTCLASS, 1)
+        self.misc_event.set()
+        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
+                                                   len(MSG), ancbufsize)
+
+        self.assertEqual(msg, MSG)
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC,
+                        ignore=ignoreflags)
+        self.assertEqual(len(ancdata), 2)
+        ancmap = self.ancillaryMapping(ancdata)
+
+        tcdata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_TCLASS)]
+        self.assertEqual(len(tcdata), SIZEOF_INT)
+        a = array.array("i")
+        a.fromstring(tcdata)
+        self.assertGreaterEqual(a[0], 0)
+        self.assertLessEqual(a[0], 255)
+
+        hldata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT)]
+        self.assertEqual(len(hldata), SIZEOF_INT)
+        a = array.array("i")
+        a.fromstring(hldata)
+        self.assertGreaterEqual(a[0], 0)
+        self.assertLessEqual(a[0], maxhop)
+
+    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
+                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
+    def testRecvTrafficClassAndHopLimit(self):
+        # Test receiving traffic class and hop limit as ancillary data.
+        self.checkTrafficClassAndHopLimit(ancbufsize=10240)
+
+    @testRecvTrafficClassAndHopLimit.client_skip
+    def _testRecvTrafficClassAndHopLimit(self):
+        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
+        self.sendToServer(MSG)
+
+    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
+                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
+    def testRecvTrafficClassAndHopLimitCMSG_SPACE(self):
+        # Test receiving traffic class and hop limit, using
+        # CMSG_SPACE() to calculate buffer size.
+        self.checkTrafficClassAndHopLimit(
+            ancbufsize=socket.CMSG_SPACE(SIZEOF_INT) * 2)
+
+    @testRecvTrafficClassAndHopLimitCMSG_SPACE.client_skip
+    def _testRecvTrafficClassAndHopLimitCMSG_SPACE(self):
+        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
+        self.sendToServer(MSG)
+
+    @requireAttrs(socket.socket, "sendmsg")
+    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
+                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
+    def testSetTrafficClassAndHopLimit(self):
+        # Test setting traffic class and hop limit on outgoing packet,
+        # and receiving them at the other end.
+        self.checkTrafficClassAndHopLimit(ancbufsize=10240,
+                                          maxhop=self.hop_limit)
+
+    @testSetTrafficClassAndHopLimit.client_skip
+    def _testSetTrafficClassAndHopLimit(self):
+        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
+        self.assertEqual(
+            self.sendmsgToServer([MSG],
+                                 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
+                                   array.array("i", [self.traffic_class])),
+                                  (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
+                                   array.array("i", [self.hop_limit]))]),
+            len(MSG))
+
+    @requireAttrs(socket.socket, "sendmsg")
+    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
+                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
+    def testOddCmsgSize(self):
+        # Try to send ancillary data with first item one byte too
+        # long.  Fall back to sending with correct size if this fails,
+        # and check that second item was handled correctly.
+        self.checkTrafficClassAndHopLimit(ancbufsize=10240,
+                                          maxhop=self.hop_limit)
+
+    @testOddCmsgSize.client_skip
+    def _testOddCmsgSize(self):
+        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
+        try:
+            nbytes = self.sendmsgToServer(
+                [MSG],
+                [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
+                  array.array("i", [self.traffic_class]).tostring() + b"\x00"),
+                 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
+                  array.array("i", [self.hop_limit]))])
+        except socket.error as e:
+            self.assertIsInstance(e.errno, int)
+            nbytes = self.sendmsgToServer(
+                [MSG],
+                [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
+                  array.array("i", [self.traffic_class])),
+                 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
+                  array.array("i", [self.hop_limit]))])
+            self.assertEqual(nbytes, len(MSG))
+
+    # Tests for proper handling of truncated ancillary data
+
+    def checkHopLimitTruncatedHeader(self, ancbufsize, ignoreflags=0):
+        # Receive hop limit into ancbufsize bytes of ancillary data
+        # space, which should be too small to contain the ancillary
+        # data header (if ancbufsize is None, pass no second argument
+        # to recvmsg()).  Check that data is MSG, MSG_CTRUNC is set
+        # (unless included in ignoreflags), and no ancillary data is
+        # returned.
+        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
+                                  socket.IPV6_RECVHOPLIMIT, 1)
+        self.misc_event.set()
+        args = () if ancbufsize is None else (ancbufsize,)
+        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
+                                                   len(MSG), *args)
+
+        self.assertEqual(msg, MSG)
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.assertEqual(ancdata, [])
+        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC,
+                        ignore=ignoreflags)
+
+    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
+    def testCmsgTruncNoBufSize(self):
+        # Check that no ancillary data is received when no ancillary
+        # buffer size is provided.
+        self.checkHopLimitTruncatedHeader(ancbufsize=None,
+                                          # BSD seems to set
+                                          # MSG_CTRUNC only if an item
+                                          # has been partially
+                                          # received.
+                                          ignoreflags=socket.MSG_CTRUNC)
+
+    @testCmsgTruncNoBufSize.client_skip
+    def _testCmsgTruncNoBufSize(self):
+        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
+        self.sendToServer(MSG)
+
+    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
+    def testSingleCmsgTrunc0(self):
+        # Check that no ancillary data is received when ancillary
+        # buffer size is zero.
+        self.checkHopLimitTruncatedHeader(ancbufsize=0,
+                                          ignoreflags=socket.MSG_CTRUNC)
+
+    @testSingleCmsgTrunc0.client_skip
+    def _testSingleCmsgTrunc0(self):
+        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
+        self.sendToServer(MSG)
+
+    # Check that no ancillary data is returned for various non-zero
+    # (but still too small) buffer sizes.
+
+    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
+    def testSingleCmsgTrunc1(self):
+        self.checkHopLimitTruncatedHeader(ancbufsize=1)
+
+    @testSingleCmsgTrunc1.client_skip
+    def _testSingleCmsgTrunc1(self):
+        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
+        self.sendToServer(MSG)
+
+    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
+    def testSingleCmsgTrunc2Int(self):
+        self.checkHopLimitTruncatedHeader(ancbufsize=2 * SIZEOF_INT)
+
+    @testSingleCmsgTrunc2Int.client_skip
+    def _testSingleCmsgTrunc2Int(self):
+        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
+        self.sendToServer(MSG)
+
+    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
+    def testSingleCmsgTruncLen0Minus1(self):
+        self.checkHopLimitTruncatedHeader(ancbufsize=socket.CMSG_LEN(0) - 1)
+
+    @testSingleCmsgTruncLen0Minus1.client_skip
+    def _testSingleCmsgTruncLen0Minus1(self):
+        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
+        self.sendToServer(MSG)
+
+    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
+    def testSingleCmsgTruncInData(self):
+        # Test truncation of a control message inside its associated
+        # data.  The message may be returned with its data truncated,
+        # or not returned at all.
+        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
+                                  socket.IPV6_RECVHOPLIMIT, 1)
+        self.misc_event.set()
+        msg, ancdata, flags, addr = self.doRecvmsg(
+            self.serv_sock, len(MSG), socket.CMSG_LEN(SIZEOF_INT) - 1)
+
+        self.assertEqual(msg, MSG)
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
+
+        self.assertLessEqual(len(ancdata), 1)
+        if ancdata:
+            cmsg_level, cmsg_type, cmsg_data = ancdata[0]
+            self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
+            self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT)
+            self.assertLess(len(cmsg_data), SIZEOF_INT)
+
+    @testSingleCmsgTruncInData.client_skip
+    def _testSingleCmsgTruncInData(self):
+        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
+        self.sendToServer(MSG)
+
+    def checkTruncatedSecondHeader(self, ancbufsize, ignoreflags=0):
+        # Receive traffic class and hop limit into ancbufsize bytes of
+        # ancillary data space, which should be large enough to
+        # contain the first item, but too small to contain the header
+        # of the second.  Check that data is MSG, MSG_CTRUNC is set
+        # (unless included in ignoreflags), and only one ancillary
+        # data item is returned.
+        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
+                                  socket.IPV6_RECVHOPLIMIT, 1)
+        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
+                                  socket.IPV6_RECVTCLASS, 1)
+        self.misc_event.set()
+        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
+                                                   len(MSG), ancbufsize)
+
+        self.assertEqual(msg, MSG)
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC,
+                        ignore=ignoreflags)
+
+        self.assertEqual(len(ancdata), 1)
+        cmsg_level, cmsg_type, cmsg_data = ancdata[0]
+        self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
+        self.assertIn(cmsg_type, {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT})
+        self.assertEqual(len(cmsg_data), SIZEOF_INT)
+        a = array.array("i")
+        a.fromstring(cmsg_data)
+        self.assertGreaterEqual(a[0], 0)
+        self.assertLessEqual(a[0], 255)
+
+    # Try the above test with various buffer sizes.
+
+    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
+                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
+    def testSecondCmsgTrunc0(self):
+        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT),
+                                        ignoreflags=socket.MSG_CTRUNC)
+
+    @testSecondCmsgTrunc0.client_skip
+    def _testSecondCmsgTrunc0(self):
+        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
+        self.sendToServer(MSG)
+
+    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
+                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
+    def testSecondCmsgTrunc1(self):
+        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 1)
+
+    @testSecondCmsgTrunc1.client_skip
+    def _testSecondCmsgTrunc1(self):
+        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
+        self.sendToServer(MSG)
+
+    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
+                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
+    def testSecondCmsgTrunc2Int(self):
+        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) +
+                                        2 * SIZEOF_INT)
+
+    @testSecondCmsgTrunc2Int.client_skip
+    def _testSecondCmsgTrunc2Int(self):
+        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
+        self.sendToServer(MSG)
+
+    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
+                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
+    def testSecondCmsgTruncLen0Minus1(self):
+        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) +
+                                        socket.CMSG_LEN(0) - 1)
+
+    @testSecondCmsgTruncLen0Minus1.client_skip
+    def _testSecondCmsgTruncLen0Minus1(self):
+        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
+        self.sendToServer(MSG)
+
+    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
+                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
+    def testSecomdCmsgTruncInData(self):
+        # Test truncation of the second of two control messages inside
+        # its associated data.
+        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
+                                  socket.IPV6_RECVHOPLIMIT, 1)
+        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
+                                  socket.IPV6_RECVTCLASS, 1)
+        self.misc_event.set()
+        msg, ancdata, flags, addr = self.doRecvmsg(
+            self.serv_sock, len(MSG),
+            socket.CMSG_SPACE(SIZEOF_INT) + socket.CMSG_LEN(SIZEOF_INT) - 1)
+
+        self.assertEqual(msg, MSG)
+        self.checkRecvmsgAddress(addr, self.cli_addr)
+        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
+
+        cmsg_types = {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT}
+
+        cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0)
+        self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
+        cmsg_types.remove(cmsg_type)
+        self.assertEqual(len(cmsg_data), SIZEOF_INT)
+        a = array.array("i")
+        a.fromstring(cmsg_data)
+        self.assertGreaterEqual(a[0], 0)
+        self.assertLessEqual(a[0], 255)
+
+        if ancdata:
+            cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0)
+            self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
+            cmsg_types.remove(cmsg_type)
+            self.assertLess(len(cmsg_data), SIZEOF_INT)
+
+        self.assertEqual(ancdata, [])
+
+    @testSecomdCmsgTruncInData.client_skip
+    def _testSecomdCmsgTruncInData(self):
+        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
+        self.sendToServer(MSG)
+
+
+# Derive concrete test classes for different socket types.
+
+class SendrecvmsgUDPTestBase(SendrecvmsgDgramFlagsBase,
+                             SendrecvmsgConnectionlessBase,
+                             ThreadedSocketTestMixin, UDPTestBase):
+    pass
+
+@requireAttrs(socket.socket, "sendmsg")
+@unittest.skipUnless(thread, 'Threading required for this test.')
+class SendmsgUDPTest(SendmsgConnectionlessTests, SendrecvmsgUDPTestBase):
+    pass
+
+@requireAttrs(socket.socket, "recvmsg")
+@unittest.skipUnless(thread, 'Threading required for this test.')
+class RecvmsgUDPTest(RecvmsgTests, SendrecvmsgUDPTestBase):
+    pass
+
+@requireAttrs(socket.socket, "recvmsg_into")
+@unittest.skipUnless(thread, 'Threading required for this test.')
+class RecvmsgIntoUDPTest(RecvmsgIntoTests, SendrecvmsgUDPTestBase):
+    pass
+
+
+class SendrecvmsgUDP6TestBase(SendrecvmsgDgramFlagsBase,
+                              SendrecvmsgConnectionlessBase,
+                              ThreadedSocketTestMixin, UDP6TestBase):
+    pass
+
+@requireAttrs(socket.socket, "sendmsg")
+@unittest.skipUnless(socket.has_ipv6, "Python not built with IPv6 support")
+@requireSocket("AF_INET6", "SOCK_DGRAM")
+@unittest.skipUnless(thread, 'Threading required for this test.')
+class SendmsgUDP6Test(SendmsgConnectionlessTests, SendrecvmsgUDP6TestBase):
+    pass
+
+@requireAttrs(socket.socket, "recvmsg")
+@unittest.skipUnless(socket.has_ipv6, "Python not built with IPv6 support")
+@requireSocket("AF_INET6", "SOCK_DGRAM")
+@unittest.skipUnless(thread, 'Threading required for this test.')
+class RecvmsgUDP6Test(RecvmsgTests, SendrecvmsgUDP6TestBase):
+    pass
+
+@requireAttrs(socket.socket, "recvmsg_into")
+@unittest.skipUnless(socket.has_ipv6, "Python not built with IPv6 support")
+@requireSocket("AF_INET6", "SOCK_DGRAM")
+@unittest.skipUnless(thread, 'Threading required for this test.')
+class RecvmsgIntoUDP6Test(RecvmsgIntoTests, SendrecvmsgUDP6TestBase):
+    pass
+
+@requireAttrs(socket.socket, "recvmsg")
+@unittest.skipUnless(socket.has_ipv6, "Python not built with IPv6 support")
+@requireAttrs(socket, "IPPROTO_IPV6")
+@requireSocket("AF_INET6", "SOCK_DGRAM")
+@unittest.skipUnless(thread, 'Threading required for this test.')
+class RecvmsgRFC3542AncillaryUDP6Test(RFC3542AncillaryTest,
+                                      SendrecvmsgUDP6TestBase):
+    pass
+
+@requireAttrs(socket.socket, "recvmsg_into")
+@unittest.skipUnless(socket.has_ipv6, "Python not built with IPv6 support")
+@requireAttrs(socket, "IPPROTO_IPV6")
+@requireSocket("AF_INET6", "SOCK_DGRAM")
+@unittest.skipUnless(thread, 'Threading required for this test.')
+class RecvmsgIntoRFC3542AncillaryUDP6Test(RecvmsgIntoMixin,
+                                          RFC3542AncillaryTest,
+                                          SendrecvmsgUDP6TestBase):
+    pass
+
+
+class SendrecvmsgTCPTestBase(SendrecvmsgConnectedBase,
+                             ConnectedStreamTestMixin, TCPTestBase):
+    pass
+
+@requireAttrs(socket.socket, "sendmsg")
+@unittest.skipUnless(thread, 'Threading required for this test.')
+class SendmsgTCPTest(SendmsgStreamTests, SendrecvmsgTCPTestBase):
+    pass
+
+@requireAttrs(socket.socket, "recvmsg")
+@unittest.skipUnless(thread, 'Threading required for this test.')
+class RecvmsgTCPTest(RecvmsgTests, RecvmsgGenericStreamTests,
+                     SendrecvmsgTCPTestBase):
+    pass
+
+@requireAttrs(socket.socket, "recvmsg_into")
+@unittest.skipUnless(thread, 'Threading required for this test.')
+class RecvmsgIntoTCPTest(RecvmsgIntoTests, RecvmsgGenericStreamTests,
+                         SendrecvmsgTCPTestBase):
+    pass
+
+
+class SendrecvmsgSCTPStreamTestBase(SendrecvmsgSCTPFlagsBase,
+                                    SendrecvmsgConnectedBase,
+                                    ConnectedStreamTestMixin, SCTPStreamBase):
+    pass
+
+@requireAttrs(socket.socket, "sendmsg")
+@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP")
+@unittest.skipUnless(thread, 'Threading required for this test.')
+class SendmsgSCTPStreamTest(SendmsgStreamTests, SendrecvmsgSCTPStreamTestBase):
+    pass
+
+@requireAttrs(socket.socket, "recvmsg")
+@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP")
+@unittest.skipUnless(thread, 'Threading required for this test.')
+class RecvmsgSCTPStreamTest(RecvmsgTests, RecvmsgGenericStreamTests,
+                            SendrecvmsgSCTPStreamTestBase):
+    pass
+
+@requireAttrs(socket.socket, "recvmsg_into")
+@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP")
+@unittest.skipUnless(thread, 'Threading required for this test.')
+class RecvmsgIntoSCTPStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests,
+                                SendrecvmsgSCTPStreamTestBase):
+    pass
+
+
+class SendrecvmsgUnixStreamTestBase(SendrecvmsgConnectedBase,
+                                    ConnectedStreamTestMixin, UnixStreamBase):
+    pass
+
+@requireAttrs(socket.socket, "sendmsg")
+@requireAttrs(socket, "AF_UNIX")
+@unittest.skipUnless(thread, 'Threading required for this test.')
+class SendmsgUnixStreamTest(SendmsgStreamTests, SendrecvmsgUnixStreamTestBase):
+    pass
+
+@requireAttrs(socket.socket, "recvmsg")
+@requireAttrs(socket, "AF_UNIX")
+@unittest.skipUnless(thread, 'Threading required for this test.')
+class RecvmsgUnixStreamTest(RecvmsgTests, RecvmsgGenericStreamTests,
+                            SendrecvmsgUnixStreamTestBase):
+    pass
+
+@requireAttrs(socket.socket, "recvmsg_into")
+@requireAttrs(socket, "AF_UNIX")
+@unittest.skipUnless(thread, 'Threading required for this test.')
+class RecvmsgIntoUnixStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests,
+                                SendrecvmsgUnixStreamTestBase):
+    pass
+
+@requireAttrs(socket.socket, "sendmsg", "recvmsg")
+@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS")
+@unittest.skipUnless(thread, 'Threading required for this test.')
+class RecvmsgSCMRightsStreamTest(SCMRightsTest, SendrecvmsgUnixStreamTestBase):
+    pass
+
+@requireAttrs(socket.socket, "sendmsg", "recvmsg_into")
+@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS")
+@unittest.skipUnless(thread, 'Threading required for this test.')
+class RecvmsgIntoSCMRightsStreamTest(RecvmsgIntoMixin, SCMRightsTest,
+                                     SendrecvmsgUnixStreamTestBase):
+    pass
+
+
+# Test interrupting the interruptible send/receive methods with a
+# signal when a timeout is set.  These tests avoid having multiple
+# threads alive during the test so that the OS cannot deliver the
+# signal to the wrong one.
+
+class InterruptedTimeoutBase(unittest.TestCase):
+    # Base class for interrupted send/receive tests.  Installs an
+    # empty handler for SIGALRM and removes it on teardown, along with
+    # any scheduled alarms.
+
+    def setUp(self):
+        super().setUp()
+        orig_alrm_handler = signal.signal(signal.SIGALRM,
+                                          lambda signum, frame: None)
+        self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
+        self.addCleanup(self.setAlarm, 0)
+
+    # Timeout for socket operations
+    timeout = 4.0
+
+    # Provide setAlarm() method to schedule delivery of SIGALRM after
+    # given number of seconds, or cancel it if zero, and an
+    # appropriate time value to use.  Use setitimer() if available.
+    if hasattr(signal, "setitimer"):
+        alarm_time = 0.05
+
+        def setAlarm(self, seconds):
+            signal.setitimer(signal.ITIMER_REAL, seconds)
+    else:
+        # Old systems may deliver the alarm up to one second early
+        alarm_time = 2
+
+        def setAlarm(self, seconds):
+            signal.alarm(seconds)
+
+
+# Require siginterrupt() in order to ensure that system calls are
+# interrupted by default.
+@requireAttrs(signal, "siginterrupt")
+@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"),
+                     "Don't have signal.alarm or signal.setitimer")
+class InterruptedRecvTimeoutTest(InterruptedTimeoutBase, UDPTestBase):
+    # Test interrupting the recv*() methods with signals when a
+    # timeout is set.
+
+    def setUp(self):
+        super().setUp()
+        self.serv.settimeout(self.timeout)
+
+    def checkInterruptedRecv(self, func, *args, **kwargs):
+        # Check that func(*args, **kwargs) raises socket.error with an
+        # errno of EINTR when interrupted by a signal.
+        self.setAlarm(self.alarm_time)
+        with self.assertRaises(socket.error) as cm:
+            func(*args, **kwargs)
+        self.assertNotIsInstance(cm.exception, socket.timeout)
+        self.assertEqual(cm.exception.errno, errno.EINTR)
+
+    def testInterruptedRecvTimeout(self):
+        self.checkInterruptedRecv(self.serv.recv, 1024)
+
+    def testInterruptedRecvIntoTimeout(self):
+        self.checkInterruptedRecv(self.serv.recv_into, bytearray(1024))
+
+    def testInterruptedRecvfromTimeout(self):
+        self.checkInterruptedRecv(self.serv.recvfrom, 1024)
+
+    def testInterruptedRecvfromIntoTimeout(self):
+        self.checkInterruptedRecv(self.serv.recvfrom_into, bytearray(1024))
+
+    @requireAttrs(socket.socket, "recvmsg")
+    def testInterruptedRecvmsgTimeout(self):
+        self.checkInterruptedRecv(self.serv.recvmsg, 1024)
+
+    @requireAttrs(socket.socket, "recvmsg_into")
+    def testInterruptedRecvmsgIntoTimeout(self):
+        self.checkInterruptedRecv(self.serv.recvmsg_into, [bytearray(1024)])
+
+
+# Require siginterrupt() in order to ensure that system calls are
+# interrupted by default.
+@requireAttrs(signal, "siginterrupt")
+@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"),
+                     "Don't have signal.alarm or signal.setitimer")
+@unittest.skipUnless(thread, 'Threading required for this test.')
+class InterruptedSendTimeoutTest(InterruptedTimeoutBase,
+                                 ThreadSafeCleanupTestCase,
+                                 SocketListeningTestMixin, TCPTestBase):
+    # Test interrupting the interruptible send*() methods with signals
+    # when a timeout is set.
+
+    def setUp(self):
+        super().setUp()
+        self.serv_conn = self.newSocket()
+        self.addCleanup(self.serv_conn.close)
+        # Use a thread to complete the connection, but wait for it to
+        # terminate before running the test, so that there is only one
+        # thread to accept the signal.
+        cli_thread = threading.Thread(target=self.doConnect)
+        cli_thread.start()
+        self.cli_conn, addr = self.serv.accept()
+        self.addCleanup(self.cli_conn.close)
+        cli_thread.join()
+        self.serv_conn.settimeout(self.timeout)
+
+    def doConnect(self):
+        self.serv_conn.connect(self.serv_addr)
+
+    def checkInterruptedSend(self, func, *args, **kwargs):
+        # Check that func(*args, **kwargs), run in a loop, raises
+        # socket.error with an errno of EINTR when interrupted by a
+        # signal.
+        with self.assertRaises(socket.error) as cm:
+            while True:
+                self.setAlarm(self.alarm_time)
+                func(*args, **kwargs)
+        self.assertNotIsInstance(cm.exception, socket.timeout)
+        self.assertEqual(cm.exception.errno, errno.EINTR)
+
+    def testInterruptedSendTimeout(self):
+        self.checkInterruptedSend(self.serv_conn.send, b"a"*512)
+
+    def testInterruptedSendtoTimeout(self):
+        # Passing an actual address here as Python's wrapper for
+        # sendto() doesn't allow passing a zero-length one; POSIX
+        # requires that the address is ignored since the socket is
+        # connection-mode, however.
+        self.checkInterruptedSend(self.serv_conn.sendto, b"a"*512,
+                                  self.serv_addr)
+
+    @requireAttrs(socket.socket, "sendmsg")
+    def testInterruptedSendmsgTimeout(self):
+        self.checkInterruptedSend(self.serv_conn.sendmsg, [b"a"*512])
+
+
 @unittest.skipUnless(thread, 'Threading required for this test.')
 class TCPCloserTest(ThreadedTCPSocketTest):
 
@@ -2077,6 +4172,31 @@
     if isTipcAvailable():
         tests.append(TIPCTest)
         tests.append(TIPCThreadableTest)
+    tests.extend([
+        CmsgMacroTests,
+        SendmsgUDPTest,
+        RecvmsgUDPTest,
+        RecvmsgIntoUDPTest,
+        SendmsgUDP6Test,
+        RecvmsgUDP6Test,
+        RecvmsgRFC3542AncillaryUDP6Test,
+        RecvmsgIntoRFC3542AncillaryUDP6Test,
+        RecvmsgIntoUDP6Test,
+        SendmsgTCPTest,
+        RecvmsgTCPTest,
+        RecvmsgIntoTCPTest,
+        SendmsgSCTPStreamTest,
+        RecvmsgSCTPStreamTest,
+        RecvmsgIntoSCTPStreamTest,
+        SendmsgUnixStreamTest,
+        RecvmsgUnixStreamTest,
+        RecvmsgIntoUnixStreamTest,
+        RecvmsgSCMRightsStreamTest,
+        RecvmsgIntoSCMRightsStreamTest,
+        # These are slow when setitimer() is not available
+        InterruptedRecvTimeoutTest,
+        InterruptedSendTimeoutTest,
+    ])
 
     thread_info = support.threading_setup()
     support.run_unittest(*tests)