Issue #10141: socket: add SocketCAN (PF_CAN) support. Initial patch by Matthias
Fuchs, updated by Tiago Gonçalves.
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py
index 79160f4..1558570 100644
--- a/Lib/test/test_socket.py
+++ b/Lib/test/test_socket.py
@@ -21,6 +21,7 @@
 import signal
 import math
 import pickle
+import struct
 try:
     import fcntl
 except ImportError:
@@ -36,6 +37,18 @@
     thread = None
     threading = None
 
+def _have_socket_can():
+    """Check whether CAN sockets are supported on this host."""
+    try:
+        s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
+    except (AttributeError, socket.error, OSError):
+        return False
+    else:
+        s.close()
+    return True
+
+HAVE_SOCKET_CAN = _have_socket_can()
+
 # Size in bytes of the int type
 SIZEOF_INT = array.array("i").itemsize
 
@@ -80,6 +93,30 @@
             with self._cleanup_lock:
                 return super().doCleanups(*args, **kwargs)
 
+class SocketCANTest(unittest.TestCase):
+
+    """To be able to run this test, a `vcan0` CAN interface can be created with
+    the following commands:
+    # modprobe vcan
+    # ip link add dev vcan0 type vcan
+    # ifconfig vcan0 up
+    """
+    interface = 'vcan0'
+    bufsize = 128
+
+    def setUp(self):
+        self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
+        try:
+            self.s.bind((self.interface,))
+        except socket.error:
+            self.skipTest('network interface `%s` does not exist' %
+                           self.interface)
+            self.s.close()
+
+    def tearDown(self):
+        self.s.close()
+        self.s = None
+
 class ThreadableTest:
     """Threadable Test class
 
@@ -210,6 +247,26 @@
         self.cli = None
         ThreadableTest.clientTearDown(self)
 
+class ThreadedCANSocketTest(SocketCANTest, ThreadableTest):
+
+    def __init__(self, methodName='runTest'):
+        SocketCANTest.__init__(self, methodName=methodName)
+        ThreadableTest.__init__(self)
+
+    def clientSetUp(self):
+        self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
+        try:
+            self.cli.bind((self.interface,))
+        except socket.error:
+            self.skipTest('network interface `%s` does not exist' %
+                           self.interface)
+            self.cli.close()
+
+    def clientTearDown(self):
+        self.cli.close()
+        self.cli = None
+        ThreadableTest.clientTearDown(self)
+
 class SocketConnectedTest(ThreadedTCPSocketTest):
     """Socket tests for client-server connection.
 
@@ -1072,6 +1129,112 @@
         srv.close()
 
 
+@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
+class BasicCANTest(unittest.TestCase):
+
+    def testCrucialConstants(self):
+        socket.AF_CAN
+        socket.PF_CAN
+        socket.CAN_RAW
+
+    def testCreateSocket(self):
+        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
+            pass
+
+    def testBindAny(self):
+        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
+            s.bind(('', ))
+
+    def testTooLongInterfaceName(self):
+        # most systems limit IFNAMSIZ to 16, take 1024 to be sure
+        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
+            self.assertRaisesRegexp(socket.error, 'interface name too long',
+                                    s.bind, ('x' * 1024,))
+
+    @unittest.skipUnless(hasattr(socket, "CAN_RAW_LOOPBACK"),
+                         'socket.CAN_RAW_LOOPBACK required for this test.')
+    def testLoopback(self):
+        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
+            for loopback in (0, 1):
+                s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK,
+                             loopback)
+                self.assertEqual(loopback,
+                    s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK))
+
+    @unittest.skipUnless(hasattr(socket, "CAN_RAW_FILTER"),
+                         'socket.CAN_RAW_FILTER required for this test.')
+    def testFilter(self):
+        can_id, can_mask = 0x200, 0x700
+        can_filter = struct.pack("=II", can_id, can_mask)
+        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
+            s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, can_filter)
+            self.assertEqual(can_filter,
+                    s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, 8))
+
+
+@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
+@unittest.skipUnless(thread, 'Threading required for this test.')
+class CANTest(ThreadedCANSocketTest):
+
+    """The CAN frame structure is defined in <linux/can.h>:
+
+    struct can_frame {
+        canid_t can_id;  /* 32 bit CAN_ID + EFF/RTR/ERR flags */
+        __u8    can_dlc; /* data length code: 0 .. 8 */
+        __u8    data[8] __attribute__((aligned(8)));
+    };
+    """
+    can_frame_fmt = "=IB3x8s"
+
+    def __init__(self, methodName='runTest'):
+        ThreadedCANSocketTest.__init__(self, methodName=methodName)
+
+    @classmethod
+    def build_can_frame(cls, can_id, data):
+        """Build a CAN frame."""
+        can_dlc = len(data)
+        data = data.ljust(8, b'\x00')
+        return struct.pack(cls.can_frame_fmt, can_id, can_dlc, data)
+
+    @classmethod
+    def dissect_can_frame(cls, frame):
+        """Dissect a CAN frame."""
+        can_id, can_dlc, data = struct.unpack(cls.can_frame_fmt, frame)
+        return (can_id, can_dlc, data[:can_dlc])
+
+    def testSendFrame(self):
+        cf, addr = self.s.recvfrom(self.bufsize)
+        self.assertEqual(self.cf, cf)
+        self.assertEqual(addr[0], self.interface)
+        self.assertEqual(addr[1], socket.AF_CAN)
+
+    def _testSendFrame(self):
+        self.cf = self.build_can_frame(0x00, b'\x01\x02\x03\x04\x05')
+        self.cli.send(self.cf)
+
+    def testSendMaxFrame(self):
+        cf, addr = self.s.recvfrom(self.bufsize)
+        self.assertEqual(self.cf, cf)
+
+    def _testSendMaxFrame(self):
+        self.cf = self.build_can_frame(0x00, b'\x07' * 8)
+        self.cli.send(self.cf)
+
+    def testSendMultiFrames(self):
+        cf, addr = self.s.recvfrom(self.bufsize)
+        self.assertEqual(self.cf1, cf)
+
+        cf, addr = self.s.recvfrom(self.bufsize)
+        self.assertEqual(self.cf2, cf)
+
+    def _testSendMultiFrames(self):
+        self.cf1 = self.build_can_frame(0x07, b'\x44\x33\x22\x11')
+        self.cli.send(self.cf1)
+
+        self.cf2 = self.build_can_frame(0x12, b'\x99\x22\x33')
+        self.cli.send(self.cf2)
+
+
 @unittest.skipUnless(thread, 'Threading required for this test.')
 class BasicTCPTest(SocketConnectedTest):
 
@@ -4194,6 +4357,7 @@
     if isTipcAvailable():
         tests.append(TIPCTest)
         tests.append(TIPCThreadableTest)
+    tests.extend([BasicCANTest, CANTest])
     tests.extend([
         CmsgMacroTests,
         SendmsgUDPTest,