Merge "Add tests for cross-family bind() and connect() operations."
diff --git a/tests/net_test/ping6_test.py b/tests/net_test/ping6_test.py
index ca6ebbd..8fc82d1 100755
--- a/tests/net_test/ping6_test.py
+++ b/tests/net_test/ping6_test.py
@@ -28,6 +28,7 @@
 
 from scapy import all as scapy
 
+import csocket
 import multinetwork_base
 import net_test
 
@@ -378,6 +379,39 @@
     scapy.send(GetIPv6Unreachable(port))
     # No crash? Good.
 
+  def testCrossProtocolCalls(self):
+    """Tests that passing in the wrong family returns EAFNOSUPPORT."""
+
+    def CheckEAFNoSupport(function, *args):
+      self.assertRaisesErrno(errno.EAFNOSUPPORT, function, *args)
+
+    ipv6sockaddr = csocket.Sockaddr((net_test.IPV6_ADDR, 53))
+
+    # In order to check that IPv6 socket calls return EAFNOSUPPORT when passed
+    # IPv4 socket address structures, we need to pass down a socket address
+    # length argument that's at least sizeof(sockaddr_in6). Otherwise, the calls
+    # will fail immediately with EINVAL because the passed-in socket length is
+    # too short. So create a sockaddr_in that's as long as a sockaddr_in6.
+    ipv4sockaddr = csocket.Sockaddr((net_test.IPV4_ADDR, 53))
+    ipv4sockaddr = csocket.SockaddrIn6(
+        ipv4sockaddr.Pack() +
+        "\x00" * (len(csocket.SockaddrIn6) - len(csocket.SockaddrIn)))
+
+    s4 = net_test.IPv4PingSocket()
+    s6 = net_test.IPv6PingSocket()
+
+    # We can't just call s.connect(), s.bind() etc. with a tuple of the wrong
+    # address family, because the Python implementation will just pass garbage
+    # down to the kernel. So call the C functions directly.
+    CheckEAFNoSupport(csocket.Bind, s4, ipv6sockaddr)
+    CheckEAFNoSupport(csocket.Bind, s6, ipv4sockaddr)
+    CheckEAFNoSupport(csocket.Connect, s4, ipv6sockaddr)
+    CheckEAFNoSupport(csocket.Connect, s6, ipv4sockaddr)
+    CheckEAFNoSupport(csocket.Sendmsg,
+                      s4, ipv6sockaddr, net_test.IPV4_PING, None, 0)
+    CheckEAFNoSupport(csocket.Sendmsg,
+                      s6, ipv4sockaddr, net_test.IPV6_PING, None, 0)
+
   def testIPv4Bind(self):
     # Bind to unspecified address.
     s = net_test.IPv4PingSocket()
@@ -485,6 +519,25 @@
       if e.errno == errno.EACCES:
         pass  # We're not root. let it go for now.
 
+  def testAfUnspecBind(self):
+    # Binding to AF_UNSPEC is treated as IPv4 if the address is 0.0.0.0.
+    s4 = net_test.IPv4PingSocket()
+    sockaddr = csocket.Sockaddr(("0.0.0.0", 12996))
+    sockaddr.family = AF_UNSPEC
+    csocket.Bind(s4, sockaddr)
+    self.assertEquals(("0.0.0.0", 12996), s4.getsockname())
+
+    # But not if the address is anything else.
+    sockaddr = csocket.Sockaddr(("127.0.0.1", 58234))
+    sockaddr.family = AF_UNSPEC
+    self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s4, sockaddr)
+
+    # This doesn't work for IPv6.
+    s6 = net_test.IPv6PingSocket()
+    sockaddr = csocket.Sockaddr(("::1", 58997))
+    sockaddr.family = AF_UNSPEC
+    self.assertRaisesErrno(errno.EAFNOSUPPORT, csocket.Bind, s6, sockaddr)
+
   def testIPv6ScopedBind(self):
     # Can't bind to a link-local address without a scope ID.
     s = net_test.IPv6PingSocket()