Lorenzo Colitti | 2fcf8fe | 2016-12-12 13:30:20 +0900 | [diff] [blame] | 1 | #!/usr/bin/python |
| 2 | # |
| 3 | # Copyright 2017 The Android Open Source Project |
| 4 | # |
| 5 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | # you may not use this file except in compliance with the License. |
| 7 | # You may obtain a copy of the License at |
| 8 | # |
| 9 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | # |
| 11 | # Unless required by applicable law or agreed to in writing, software |
| 12 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | # See the License for the specific language governing permissions and |
| 15 | # limitations under the License. |
| 16 | |
| 17 | import errno |
| 18 | import fcntl |
| 19 | from socket import * # pylint: disable=wildcard-import |
| 20 | import unittest |
| 21 | |
| 22 | import csocket |
| 23 | import cstruct |
| 24 | import net_test |
| 25 | |
| 26 | IPV4_LOOPBACK_ADDR = "127.0.0.1" |
| 27 | IPV6_LOOPBACK_ADDR = "::1" |
| 28 | LOOPBACK_DEV = "lo" |
| 29 | LOOPBACK_IFINDEX = 1 |
| 30 | |
| 31 | SIOCKILLADDR = 0x8939 |
| 32 | |
| 33 | |
| 34 | Ifreq = cstruct.Struct("Ifreq", "=16s16s", "name data") |
| 35 | In6Ifreq = cstruct.Struct("In6Ifreq", "=16sIi", "addr prefixlen ifindex") |
| 36 | |
| 37 | def KillAddrIoctl(addr): |
| 38 | """Calls the SIOCKILLADDR ioctl on the provided IP address. |
| 39 | |
| 40 | Args: |
| 41 | addr The IP address to pass to the ioctl. |
| 42 | |
| 43 | Raises: |
| 44 | ValueError: If addr is of an unsupported address family. |
| 45 | """ |
| 46 | family, _, _, _, _ = getaddrinfo(addr, None, AF_UNSPEC, SOCK_DGRAM, 0, |
| 47 | AI_NUMERICHOST)[0] |
| 48 | if family == AF_INET6: |
| 49 | addr = inet_pton(AF_INET6, addr) |
| 50 | ifreq = In6Ifreq((addr, 128, LOOPBACK_IFINDEX)).Pack() |
| 51 | elif family == AF_INET: |
| 52 | addr = inet_pton(AF_INET, addr) |
| 53 | sockaddr = csocket.SockaddrIn((AF_INET, 0, addr)).Pack() |
| 54 | ifreq = Ifreq((LOOPBACK_DEV, sockaddr)).Pack() |
| 55 | else: |
| 56 | raise ValueError('Address family %r not supported.' % family) |
| 57 | datagram_socket = socket(family, SOCK_DGRAM) |
| 58 | try: |
| 59 | fcntl.ioctl(datagram_socket.fileno(), SIOCKILLADDR, ifreq) |
| 60 | finally: |
| 61 | datagram_socket.close() |
| 62 | |
| 63 | # For convenience. |
| 64 | def CreateIPv4SocketPair(): |
| 65 | return net_test.CreateSocketPair(AF_INET, SOCK_STREAM, IPV4_LOOPBACK_ADDR) |
| 66 | |
| 67 | def CreateIPv6SocketPair(): |
| 68 | return net_test.CreateSocketPair(AF_INET6, SOCK_STREAM, IPV6_LOOPBACK_ADDR) |
| 69 | |
| 70 | |
| 71 | @unittest.skipUnless(net_test.LINUX_VERSION >= (4, 4, 0), "grace period") |
| 72 | class TcpNukeAddrTest(net_test.NetworkTest): |
| 73 | |
| 74 | """Tests that SIOCKILLADDR no longer exists. |
| 75 | |
| 76 | The out-of-tree SIOCKILLADDR was replaced by the upstream SOCK_DESTROY |
| 77 | operation in Linux 4.5. It was backported to common Android trees all the way |
| 78 | back to android-3.10 and is required by CTS from Android N onwards. |
| 79 | This test ensures that it no longer works in 4.4 and above kernels. |
| 80 | |
| 81 | Relevant kernel commits: |
| 82 | android-4.4: |
| 83 | 3094efd84c Revert "net: socket ioctl to reset connections matching local address" |
| 84 | """ |
| 85 | |
| 86 | def CheckNukeAddrUnsupported(self, socketpair, addr): |
| 87 | s1, s2 = socketpair |
| 88 | self.assertRaisesErrno(errno.ENOTTY, KillAddrIoctl, addr) |
| 89 | data = "foo" |
| 90 | try: |
| 91 | self.assertEquals(len(data), s1.send(data)) |
| 92 | self.assertEquals(data, s2.recv(4096)) |
| 93 | self.assertSocketsNotClosed(socketpair) |
| 94 | finally: |
| 95 | s1.close() |
| 96 | s2.close() |
| 97 | |
| 98 | def assertSocketsNotClosed(self, socketpair): |
| 99 | for sock in socketpair: |
| 100 | self.assertTrue(sock.getpeername()) |
| 101 | |
| 102 | def testIpv4Unsupported(self): |
| 103 | self.CheckNukeAddrUnsupported(CreateIPv4SocketPair(), IPV4_LOOPBACK_ADDR) |
| 104 | self.CheckNukeAddrUnsupported(CreateIPv4SocketPair(), "0.0.0.0") |
| 105 | |
| 106 | def testIpv6Unsupported(self): |
| 107 | self.CheckNukeAddrUnsupported(CreateIPv6SocketPair(), IPV6_LOOPBACK_ADDR) |
| 108 | self.CheckNukeAddrUnsupported(CreateIPv4SocketPair(), "::") |
| 109 | |
| 110 | |
| 111 | if __name__ == "__main__": |
| 112 | unittest.main() |