| #!/usr/bin/python |
| # |
| # Copyright 2017 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| # pylint: disable=g-bad-todo,g-bad-file-header,wildcard-import |
| from errno import * # pylint: disable=wildcard-import |
| from socket import * # pylint: disable=wildcard-import |
| |
| import random |
| import itertools |
| import struct |
| import unittest |
| |
| from scapy import all as scapy |
| from tun_twister import TunTwister |
| import csocket |
| import iproute |
| import multinetwork_base |
| import net_test |
| import packets |
| import util |
| import xfrm |
| import xfrm_base |
| |
| _LOOPBACK_IFINDEX = 1 |
| _TEST_XFRM_IFNAME = "ipsec42" |
| _TEST_XFRM_IF_ID = 42 |
| _TEST_SPI = 0x1234 |
| |
| # Does the kernel support xfrmi interfaces? |
| def HaveXfrmInterfaces(): |
| if net_test.LINUX_VERSION >= (4, 19, 0): |
| return True |
| |
| try: |
| i = iproute.IPRoute() |
| i.CreateXfrmInterface(_TEST_XFRM_IFNAME, _TEST_XFRM_IF_ID, |
| _LOOPBACK_IFINDEX) |
| i.DeleteLink(_TEST_XFRM_IFNAME) |
| try: |
| i.GetIfIndex(_TEST_XFRM_IFNAME) |
| assert "Deleted interface %s still exists!" % _TEST_XFRM_IFNAME |
| except IOError: |
| pass |
| return True |
| except IOError: |
| return False |
| |
| HAVE_XFRM_INTERFACES = HaveXfrmInterfaces() |
| |
| # Does the kernel support CONFIG_XFRM_MIGRATE? |
| def SupportsXfrmMigrate(): |
| if net_test.LINUX_VERSION >= (5, 10, 0): |
| return True |
| |
| # XFRM_MIGRATE depends on xfrmi interfaces |
| if not HAVE_XFRM_INTERFACES: |
| return False |
| |
| try: |
| x = xfrm.Xfrm() |
| wildcard_addr = net_test.GetWildcardAddress(6) |
| selector = xfrm.EmptySelector(AF_INET6) |
| |
| # Expect migration to fail with EINVAL because it is trying to migrate a |
| # non-existent SA. |
| x.MigrateTunnel(xfrm.XFRM_POLICY_OUT, selector, wildcard_addr, wildcard_addr, |
| wildcard_addr, wildcard_addr, _TEST_SPI, |
| None, None, None, None, None, None) |
| print("Migration succeeded unexpectedly, assuming XFRM_MIGRATE is enabled") |
| return True |
| except IOError as err: |
| if err.errno == ENOPROTOOPT: |
| return False |
| elif err.errno == EINVAL: |
| return True |
| else: |
| print("Unexpected error, assuming XFRM_MIGRATE is enabled:", err.errno) |
| return True |
| |
| SUPPORTS_XFRM_MIGRATE = SupportsXfrmMigrate() |
| |
| # Parameters to setup tunnels as special networks |
| _TUNNEL_NETID_OFFSET = 0xFC00 # Matches reserved netid range for IpSecService |
| _BASE_TUNNEL_NETID = {4: 40, 6: 60} |
| _BASE_VTI_OKEY = 2000000100 |
| _BASE_VTI_IKEY = 2000000200 |
| |
| _TEST_OUT_SPI = _TEST_SPI |
| _TEST_IN_SPI = _TEST_OUT_SPI |
| |
| _TEST_OKEY = 2000000100 |
| _TEST_IKEY = 2000000200 |
| |
| _TEST_REMOTE_PORT = 1234 |
| |
| _SCAPY_IP_TYPE = {4: scapy.IP, 6: scapy.IPv6} |
| |
| |
| def _GetLocalInnerAddress(version): |
| return {4: "10.16.5.15", 6: "2001:db8:1::1"}[version] |
| |
| |
| def _GetRemoteInnerAddress(version): |
| return {4: "10.16.5.20", 6: "2001:db8:2::1"}[version] |
| |
| |
| def _GetRemoteOuterAddress(version): |
| return {4: net_test.IPV4_ADDR, 6: net_test.IPV6_ADDR}[version] |
| |
| |
| def _GetNullAuthCryptTunnelModePkt(inner_version, src_inner, src_outer, |
| src_port, dst_inner, dst_outer, |
| dst_port, spi, seq_num, ip_hdr_options=None): |
| if ip_hdr_options is None: |
| ip_hdr_options = {} |
| |
| ip_hdr_options.update({'src': src_inner, 'dst': dst_inner}) |
| |
| # Build and receive an ESP packet destined for the inner socket |
| IpType = {4: scapy.IP, 6: scapy.IPv6}[inner_version] |
| input_pkt = ( |
| IpType(**ip_hdr_options) / scapy.UDP(sport=src_port, dport=dst_port) / |
| net_test.UDP_PAYLOAD) |
| input_pkt = IpType(str(input_pkt)) # Compute length, checksum. |
| input_pkt = xfrm_base.EncryptPacketWithNull(input_pkt, spi, seq_num, |
| (src_outer, dst_outer)) |
| |
| return input_pkt |
| |
| |
| def _CreateReceiveSock(version, port=0): |
| # Create a socket to receive packets. |
| read_sock = socket(net_test.GetAddressFamily(version), SOCK_DGRAM, 0) |
| read_sock.bind((net_test.GetWildcardAddress(version), port)) |
| # The second parameter of the tuple is the port number regardless of AF. |
| local_port = read_sock.getsockname()[1] |
| # Guard against the eventuality of the receive failing. |
| csocket.SetSocketTimeout(read_sock, 500) |
| |
| return read_sock, local_port |
| |
| |
| def _SendPacket(testInstance, netid, version, remote, remote_port): |
| # Send a packet out via the tunnel-backed network, bound for the port number |
| # of the input socket. |
| write_sock = socket(net_test.GetAddressFamily(version), SOCK_DGRAM, 0) |
| testInstance.SelectInterface(write_sock, netid, "mark") |
| write_sock.sendto(net_test.UDP_PAYLOAD, (remote, remote_port)) |
| local_port = write_sock.getsockname()[1] |
| |
| return local_port |
| |
| |
| def InjectTests(): |
| InjectParameterizedTests(XfrmTunnelTest) |
| InjectParameterizedTests(XfrmInterfaceTest) |
| InjectParameterizedTests(XfrmVtiTest) |
| InjectParameterizedTests(XfrmInterfaceMigrateTest) |
| |
| |
| def InjectParameterizedTests(cls): |
| VERSIONS = (4, 6) |
| param_list = itertools.product(VERSIONS, VERSIONS) |
| |
| def NameGenerator(*args): |
| return "IPv%d_in_IPv%d" % tuple(args) |
| |
| util.InjectParameterizedTest(cls, param_list, NameGenerator) |
| |
| |
| class XfrmTunnelTest(xfrm_base.XfrmLazyTest): |
| |
| def _CheckTunnelOutput(self, inner_version, outer_version, underlying_netid, |
| netid, local_inner, remote_inner, local_outer, |
| remote_outer, write_sock): |
| |
| write_sock.sendto(net_test.UDP_PAYLOAD, (remote_inner, 53)) |
| self._ExpectEspPacketOn(underlying_netid, _TEST_OUT_SPI, 1, None, |
| local_outer, remote_outer) |
| |
| def _CheckTunnelInput(self, inner_version, outer_version, underlying_netid, |
| netid, local_inner, remote_inner, local_outer, |
| remote_outer, read_sock): |
| |
| # The second parameter of the tuple is the port number regardless of AF. |
| local_port = read_sock.getsockname()[1] |
| |
| # Build and receive an ESP packet destined for the inner socket |
| input_pkt = _GetNullAuthCryptTunnelModePkt( |
| inner_version, remote_inner, remote_outer, _TEST_REMOTE_PORT, |
| local_inner, local_outer, local_port, _TEST_IN_SPI, 1) |
| self.ReceivePacketOn(underlying_netid, input_pkt) |
| |
| # Verify that the packet data and src are correct |
| data, src = read_sock.recvfrom(4096) |
| self.assertEqual(net_test.UDP_PAYLOAD, data) |
| self.assertEqual((remote_inner, _TEST_REMOTE_PORT), src[:2]) |
| |
| def _TestTunnel(self, inner_version, outer_version, func, direction, |
| test_output_mark_unset): |
| """Test a unidirectional XFRM Tunnel with explicit selectors""" |
| # Select the underlying netid, which represents the external |
| # interface from/to which to route ESP packets. |
| u_netid = self.RandomNetid() |
| # Select a random netid that will originate traffic locally and |
| # which represents the netid on which the plaintext is sent |
| netid = self.RandomNetid(exclude=u_netid) |
| |
| local_inner = self.MyAddress(inner_version, netid) |
| remote_inner = _GetRemoteInnerAddress(inner_version) |
| local_outer = self.MyAddress(outer_version, u_netid) |
| remote_outer = _GetRemoteOuterAddress(outer_version) |
| |
| output_mark = u_netid |
| if test_output_mark_unset: |
| output_mark = None |
| self.SetDefaultNetwork(u_netid) |
| |
| try: |
| # Create input/ouput SPs, SAs and sockets to simulate a more realistic |
| # environment. |
| self.xfrm.CreateTunnel( |
| xfrm.XFRM_POLICY_IN, xfrm.SrcDstSelector(remote_inner, local_inner), |
| remote_outer, local_outer, _TEST_IN_SPI, xfrm_base._ALGO_CRYPT_NULL, |
| xfrm_base._ALGO_AUTH_NULL, None, None, None, xfrm.MATCH_METHOD_ALL) |
| |
| self.xfrm.CreateTunnel( |
| xfrm.XFRM_POLICY_OUT, xfrm.SrcDstSelector(local_inner, remote_inner), |
| local_outer, remote_outer, _TEST_OUT_SPI, xfrm_base._ALGO_CBC_AES_256, |
| xfrm_base._ALGO_HMAC_SHA1, None, output_mark, None, xfrm.MATCH_METHOD_ALL) |
| |
| write_sock = socket(net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0) |
| self.SelectInterface(write_sock, netid, "mark") |
| read_sock, _ = _CreateReceiveSock(inner_version) |
| |
| sock = write_sock if direction == xfrm.XFRM_POLICY_OUT else read_sock |
| func(inner_version, outer_version, u_netid, netid, local_inner, |
| remote_inner, local_outer, remote_outer, sock) |
| finally: |
| if test_output_mark_unset: |
| self.ClearDefaultNetwork() |
| |
| def ParamTestTunnelInput(self, inner_version, outer_version): |
| self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, |
| xfrm.XFRM_POLICY_IN, False) |
| |
| def ParamTestTunnelOutput(self, inner_version, outer_version): |
| self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput, |
| xfrm.XFRM_POLICY_OUT, False) |
| |
| def ParamTestTunnelOutputNoSetMark(self, inner_version, outer_version): |
| self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput, |
| xfrm.XFRM_POLICY_OUT, True) |
| |
| |
| @unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported") |
| class XfrmAddDeleteVtiTest(xfrm_base.XfrmBaseTest): |
| def _VerifyVtiInfoData(self, vti_info_data, version, local_addr, remote_addr, |
| ikey, okey): |
| self.assertEqual(vti_info_data["IFLA_VTI_IKEY"], ikey) |
| self.assertEqual(vti_info_data["IFLA_VTI_OKEY"], okey) |
| |
| family = AF_INET if version == 4 else AF_INET6 |
| self.assertEqual(inet_ntop(family, vti_info_data["IFLA_VTI_LOCAL"]), |
| local_addr) |
| self.assertEqual(inet_ntop(family, vti_info_data["IFLA_VTI_REMOTE"]), |
| remote_addr) |
| |
| def testAddVti(self): |
| """Test the creation of a Virtual Tunnel Interface.""" |
| for version in [4, 6]: |
| netid = self.RandomNetid() |
| local_addr = self.MyAddress(version, netid) |
| self.iproute.CreateVirtualTunnelInterface( |
| dev_name=_TEST_XFRM_IFNAME, |
| local_addr=local_addr, |
| remote_addr=_GetRemoteOuterAddress(version), |
| o_key=_TEST_OKEY, |
| i_key=_TEST_IKEY) |
| self._VerifyVtiInfoData( |
| self.iproute.GetIfinfoData(_TEST_XFRM_IFNAME), version, local_addr, |
| _GetRemoteOuterAddress(version), _TEST_IKEY, _TEST_OKEY) |
| |
| new_remote_addr = {4: net_test.IPV4_ADDR2, 6: net_test.IPV6_ADDR2} |
| new_okey = _TEST_OKEY + _TEST_XFRM_IF_ID |
| new_ikey = _TEST_IKEY + _TEST_XFRM_IF_ID |
| self.iproute.CreateVirtualTunnelInterface( |
| dev_name=_TEST_XFRM_IFNAME, |
| local_addr=local_addr, |
| remote_addr=new_remote_addr[version], |
| o_key=new_okey, |
| i_key=new_ikey, |
| is_update=True) |
| |
| self._VerifyVtiInfoData( |
| self.iproute.GetIfinfoData(_TEST_XFRM_IFNAME), version, local_addr, |
| new_remote_addr[version], new_ikey, new_okey) |
| |
| if_index = self.iproute.GetIfIndex(_TEST_XFRM_IFNAME) |
| |
| # Validate that the netlink interface matches the ioctl interface. |
| self.assertEqual(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index) |
| self.iproute.DeleteLink(_TEST_XFRM_IFNAME) |
| with self.assertRaises(IOError): |
| self.iproute.GetIfIndex(_TEST_XFRM_IFNAME) |
| |
| def _QuietDeleteLink(self, ifname): |
| try: |
| self.iproute.DeleteLink(ifname) |
| except IOError: |
| # The link was not present. |
| pass |
| |
| def tearDown(self): |
| super(XfrmAddDeleteVtiTest, self).tearDown() |
| self._QuietDeleteLink(_TEST_XFRM_IFNAME) |
| |
| |
| class SaInfo(object): |
| |
| def __init__(self, spi): |
| self.spi = spi |
| self.seq_num = 1 |
| |
| |
| class IpSecBaseInterface(object): |
| |
| def __init__(self, iface, netid, underlying_netid, local, remote, version): |
| self.iface = iface |
| self.netid = netid |
| self.underlying_netid = underlying_netid |
| self.local, self.remote = local, remote |
| |
| # XFRM interfaces technically do not have a version. This keeps track of |
| # the IP version of the local and remote addresses. |
| self.version = version |
| self.rx = self.tx = 0 |
| self.addrs = {} |
| |
| self.iproute = iproute.IPRoute() |
| self.xfrm = xfrm.Xfrm() |
| |
| def Teardown(self): |
| self.TeardownXfrm() |
| self.TeardownInterface() |
| |
| def TeardownInterface(self): |
| self.iproute.DeleteLink(self.iface) |
| |
| def SetupXfrm(self, use_null_crypt): |
| rand_spi = random.randint(0, 0x7fffffff) |
| self.in_sa = SaInfo(rand_spi) |
| self.out_sa = SaInfo(rand_spi) |
| |
| # Select algorithms: |
| if use_null_crypt: |
| auth, crypt = xfrm_base._ALGO_AUTH_NULL, xfrm_base._ALGO_CRYPT_NULL |
| else: |
| auth, crypt = xfrm_base._ALGO_HMAC_SHA1, xfrm_base._ALGO_CBC_AES_256 |
| |
| self.auth = auth |
| self.crypt = crypt |
| |
| self._SetupXfrmByType(auth, crypt) |
| |
| def Rekey(self, outer_family, new_out_sa, new_in_sa): |
| """Rekeys the Tunnel Interface |
| |
| Creates new SAs and updates the outbound security policy to use new SAs. |
| |
| Args: |
| outer_family: AF_INET or AF_INET6 |
| new_out_sa: An SaInfo struct representing the new outbound SA's info |
| new_in_sa: An SaInfo struct representing the new inbound SA's info |
| """ |
| self._Rekey(outer_family, new_out_sa, new_in_sa) |
| |
| # Update Interface object |
| self.out_sa = new_out_sa |
| self.in_sa = new_in_sa |
| |
| def TeardownXfrm(self): |
| raise NotImplementedError("Subclasses should implement this") |
| |
| def _SetupXfrmByType(self, auth_algo, crypt_algo): |
| raise NotImplementedError("Subclasses should implement this") |
| |
| def _Rekey(self, outer_family, new_out_sa, new_in_sa): |
| raise NotImplementedError("Subclasses should implement this") |
| |
| |
| class VtiInterface(IpSecBaseInterface): |
| |
| def __init__(self, iface, netid, underlying_netid, _, local, remote, version): |
| super(VtiInterface, self).__init__(iface, netid, underlying_netid, local, |
| remote, version) |
| |
| self.ikey = _TEST_IKEY + netid |
| self.okey = _TEST_OKEY + netid |
| |
| self.SetupInterface() |
| self.SetupXfrm(False) |
| |
| def SetupInterface(self): |
| return self.iproute.CreateVirtualTunnelInterface( |
| self.iface, self.local, self.remote, self.ikey, self.okey) |
| |
| def _SetupXfrmByType(self, auth_algo, crypt_algo): |
| # For the VTI, the selectors are wildcard since packets will only |
| # be selected if they have the appropriate mark, hence the inner |
| # addresses are wildcard. |
| self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote, |
| self.out_sa.spi, crypt_algo, auth_algo, |
| xfrm.ExactMatchMark(self.okey), |
| self.underlying_netid, None, xfrm.MATCH_METHOD_ALL) |
| |
| self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local, |
| self.in_sa.spi, crypt_algo, auth_algo, |
| xfrm.ExactMatchMark(self.ikey), None, None, |
| xfrm.MATCH_METHOD_MARK) |
| |
| def TeardownXfrm(self): |
| self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote, |
| self.out_sa.spi, self.okey, None) |
| self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local, |
| self.in_sa.spi, self.ikey, None) |
| |
| def _Rekey(self, outer_family, new_out_sa, new_in_sa): |
| # TODO: Consider ways to share code with xfrm.CreateTunnel(). It's mostly |
| # the same, but rekeys are asymmetric, and only update the outbound |
| # policy. |
| self.xfrm.AddSaInfo(self.local, self.remote, new_out_sa.spi, |
| xfrm.XFRM_MODE_TUNNEL, 0, xfrm_base._ALGO_CRYPT_NULL, |
| xfrm_base._ALGO_AUTH_NULL, None, None, |
| xfrm.ExactMatchMark(self.okey), self.underlying_netid) |
| |
| self.xfrm.AddSaInfo(self.remote, self.local, new_in_sa.spi, |
| xfrm.XFRM_MODE_TUNNEL, 0, xfrm_base._ALGO_CRYPT_NULL, |
| xfrm_base._ALGO_AUTH_NULL, None, None, |
| xfrm.ExactMatchMark(self.ikey), None) |
| |
| # Create new policies for IPv4 and IPv6. |
| for sel in [xfrm.EmptySelector(AF_INET), xfrm.EmptySelector(AF_INET6)]: |
| # Add SPI-specific output policy to enforce using new outbound SPI |
| policy = xfrm.UserPolicy(xfrm.XFRM_POLICY_OUT, sel) |
| tmpl = xfrm.UserTemplate(outer_family, new_out_sa.spi, 0, |
| (self.local, self.remote)) |
| self.xfrm.UpdatePolicyInfo(policy, tmpl, xfrm.ExactMatchMark(self.okey), |
| 0) |
| |
| def DeleteOldSaInfo(self, outer_family, old_in_spi, old_out_spi): |
| self.xfrm.DeleteSaInfo(self.local, old_in_spi, IPPROTO_ESP, |
| xfrm.ExactMatchMark(self.ikey)) |
| self.xfrm.DeleteSaInfo(self.remote, old_out_spi, IPPROTO_ESP, |
| xfrm.ExactMatchMark(self.okey)) |
| |
| |
| @unittest.skipUnless(HAVE_XFRM_INTERFACES, "XFRM interfaces unsupported") |
| class XfrmAddDeleteXfrmInterfaceTest(xfrm_base.XfrmBaseTest): |
| """Test the creation of an XFRM Interface.""" |
| |
| def testAddXfrmInterface(self): |
| self.iproute.CreateXfrmInterface(_TEST_XFRM_IFNAME, _TEST_XFRM_IF_ID, |
| _LOOPBACK_IFINDEX) |
| if_index = self.iproute.GetIfIndex(_TEST_XFRM_IFNAME) |
| net_test.SetInterfaceUp(_TEST_XFRM_IFNAME) |
| |
| # Validate that the netlink interface matches the ioctl interface. |
| self.assertEqual(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index) |
| self.iproute.DeleteLink(_TEST_XFRM_IFNAME) |
| with self.assertRaises(IOError): |
| self.iproute.GetIfIndex(_TEST_XFRM_IFNAME) |
| |
| |
| class XfrmInterface(IpSecBaseInterface): |
| |
| def __init__(self, iface, netid, underlying_netid, ifindex, local, remote, |
| version, use_null_crypt=False): |
| super(XfrmInterface, self).__init__(iface, netid, underlying_netid, local, |
| remote, version) |
| |
| self.ifindex = ifindex |
| self.xfrm_if_id = netid |
| |
| self.SetupInterface() |
| self.SetupXfrm(use_null_crypt) |
| |
| def SetupInterface(self): |
| """Create an XFRM interface.""" |
| return self.iproute.CreateXfrmInterface(self.iface, self.netid, self.ifindex) |
| |
| def _SetupXfrmByType(self, auth_algo, crypt_algo): |
| self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote, |
| self.out_sa.spi, crypt_algo, auth_algo, None, |
| self.underlying_netid, self.xfrm_if_id, |
| xfrm.MATCH_METHOD_ALL) |
| self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local, |
| self.in_sa.spi, crypt_algo, auth_algo, None, None, |
| self.xfrm_if_id, xfrm.MATCH_METHOD_IFID) |
| |
| def TeardownXfrm(self): |
| self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote, |
| self.out_sa.spi, None, self.xfrm_if_id) |
| self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local, |
| self.in_sa.spi, None, self.xfrm_if_id) |
| |
| def _Rekey(self, outer_family, new_out_sa, new_in_sa): |
| # TODO: Consider ways to share code with xfrm.CreateTunnel(). It's mostly |
| # the same, but rekeys are asymmetric, and only update the outbound |
| # policy. |
| self.xfrm.AddSaInfo( |
| self.local, self.remote, new_out_sa.spi, xfrm.XFRM_MODE_TUNNEL, 0, |
| xfrm_base._ALGO_CRYPT_NULL, xfrm_base._ALGO_AUTH_NULL, None, None, |
| None, self.underlying_netid, xfrm_if_id=self.xfrm_if_id) |
| |
| self.xfrm.AddSaInfo( |
| self.remote, self.local, new_in_sa.spi, xfrm.XFRM_MODE_TUNNEL, 0, |
| xfrm_base._ALGO_CRYPT_NULL, xfrm_base._ALGO_AUTH_NULL, None, None, |
| None, None, xfrm_if_id=self.xfrm_if_id) |
| |
| # Create new policies for IPv4 and IPv6. |
| for sel in [xfrm.EmptySelector(AF_INET), xfrm.EmptySelector(AF_INET6)]: |
| # Add SPI-specific output policy to enforce using new outbound SPI |
| policy = xfrm.UserPolicy(xfrm.XFRM_POLICY_OUT, sel) |
| tmpl = xfrm.UserTemplate(outer_family, new_out_sa.spi, 0, |
| (self.local, self.remote)) |
| self.xfrm.UpdatePolicyInfo(policy, tmpl, None, self.xfrm_if_id) |
| |
| def DeleteOldSaInfo(self, outer_family, old_in_spi, old_out_spi): |
| self.xfrm.DeleteSaInfo(self.local, old_in_spi, IPPROTO_ESP, None, |
| self.xfrm_if_id) |
| self.xfrm.DeleteSaInfo(self.remote, old_out_spi, IPPROTO_ESP, None, |
| self.xfrm_if_id) |
| |
| def Migrate(self, new_underlying_netid, new_local, new_remote): |
| self.xfrm.MigrateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local, |
| new_remote, new_local, self.in_sa.spi, |
| self.crypt, self.auth, None, None, |
| new_underlying_netid, self.xfrm_if_id) |
| |
| self.xfrm.MigrateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote, |
| new_local, new_remote, self.out_sa.spi, |
| self.crypt, self.auth, None, None, |
| new_underlying_netid, self.xfrm_if_id) |
| |
| self.local = new_local |
| self.remote = new_remote |
| self.underlying_netid = new_underlying_netid |
| |
| |
| class XfrmTunnelBase(xfrm_base.XfrmBaseTest): |
| |
| # Subclass that does not allow multiple tunnels (e.g. XfrmInterfaceMigrateTest) |
| # should override this method. |
| @classmethod |
| def allowMultipleTunnels(cls): |
| return True |
| |
| @classmethod |
| def setUpClass(cls): |
| xfrm_base.XfrmBaseTest.setUpClass() |
| # Tunnel interfaces use marks extensively, so configure realistic packet |
| # marking rules to make the test representative, make PMTUD work, etc. |
| cls.SetInboundMarks(True) |
| cls.SetMarkReflectSysctls(1) |
| |
| # Group by tunnel version to ensure that we test at least one IPv4 and one |
| # IPv6 tunnel |
| cls.tunnelsV4 = {} |
| cls.tunnelsV6 = {} |
| |
| if not cls.allowMultipleTunnels(): |
| return |
| |
| for i, underlying_netid in enumerate(cls.tuns): |
| for version in 4, 6: |
| netid = _BASE_TUNNEL_NETID[version] + _TUNNEL_NETID_OFFSET + i |
| iface = "ipsec%s" % netid |
| local = cls.MyAddress(version, underlying_netid) |
| if version == 4: |
| remote = (net_test.IPV4_ADDR if (i % 2) else net_test.IPV4_ADDR2) |
| else: |
| remote = (net_test.IPV6_ADDR if (i % 2) else net_test.IPV6_ADDR2) |
| |
| ifindex = cls.ifindices[underlying_netid] |
| tunnel = cls.INTERFACE_CLASS(iface, netid, underlying_netid, ifindex, |
| local, remote, version) |
| cls._SetInboundMarking(netid, iface, True) |
| cls._SetupTunnelNetwork(tunnel, True) |
| |
| if version == 4: |
| cls.tunnelsV4[netid] = tunnel |
| else: |
| cls.tunnelsV6[netid] = tunnel |
| |
| @classmethod |
| def tearDownClass(cls): |
| # The sysctls are restored by MultinetworkBaseTest.tearDownClass. |
| cls.SetInboundMarks(False) |
| for tunnel in list(cls.tunnelsV4.values()) + list(cls.tunnelsV6.values()): |
| cls._SetInboundMarking(tunnel.netid, tunnel.iface, False) |
| cls._SetupTunnelNetwork(tunnel, False) |
| tunnel.Teardown() |
| xfrm_base.XfrmBaseTest.tearDownClass() |
| |
| def randomTunnel(self, outer_version): |
| version_dict = self.tunnelsV4 if outer_version == 4 else self.tunnelsV6 |
| return random.choice(list(version_dict.values())) |
| |
| def setUp(self): |
| multinetwork_base.MultiNetworkBaseTest.setUp(self) |
| self.iproute = iproute.IPRoute() |
| self.xfrm = xfrm.Xfrm() |
| |
| def tearDown(self): |
| multinetwork_base.MultiNetworkBaseTest.tearDown(self) |
| |
| def _SwapInterfaceAddress(self, ifname, old_addr, new_addr): |
| """Exchange two addresses on a given interface. |
| |
| Args: |
| ifname: Name of the interface |
| old_addr: An address to be removed from the interface |
| new_addr: An address to be added to an interface |
| """ |
| version = 6 if ":" in new_addr else 4 |
| ifindex = net_test.GetInterfaceIndex(ifname) |
| self.iproute.AddAddress(new_addr, |
| net_test.AddressLengthBits(version), ifindex) |
| self.iproute.DelAddress(old_addr, |
| net_test.AddressLengthBits(version), ifindex) |
| |
| @classmethod |
| def _GetLocalAddress(cls, version, netid): |
| if version == 4: |
| return cls._MyIPv4Address(netid - _TUNNEL_NETID_OFFSET) |
| else: |
| return cls.OnlinkPrefix(6, netid - _TUNNEL_NETID_OFFSET) + "1" |
| |
| @classmethod |
| def _SetupTunnelNetwork(cls, tunnel, is_add): |
| """Setup rules and routes for a tunnel Network. |
| |
| Takes an interface and depending on the boolean |
| value of is_add, either adds or removes the rules |
| and routes for a tunnel interface to behave like an |
| Android Network for purposes of testing. |
| |
| Args: |
| tunnel: A VtiInterface or XfrmInterface, the tunnel to set up. |
| is_add: Boolean that causes this method to perform setup if True or |
| teardown if False |
| """ |
| if is_add: |
| # Disable router solicitations to avoid occasional spurious packets |
| # arriving on the underlying network; there are two possible behaviors |
| # when that occurred: either only the RA packet is read, and when it |
| # is echoed back to the tunnel, it causes the test to fail by not |
| # receiving # the UDP_PAYLOAD; or, two packets may arrive on the |
| # underlying # network which fails the assertion that only one ESP packet |
| # is received. |
| cls.SetSysctl( |
| "/proc/sys/net/ipv6/conf/%s/router_solicitations" % tunnel.iface, 0) |
| net_test.SetInterfaceUp(tunnel.iface) |
| |
| for version in [4, 6]: |
| ifindex = net_test.GetInterfaceIndex(tunnel.iface) |
| table = tunnel.netid |
| |
| # Set up routing rules. |
| start, end = cls.UidRangeForNetid(tunnel.netid) |
| cls.iproute.UidRangeRule(version, is_add, start, end, table, |
| cls.PRIORITY_UID) |
| cls.iproute.OifRule(version, is_add, tunnel.iface, table, cls.PRIORITY_OIF) |
| cls.iproute.FwmarkRule(version, is_add, tunnel.netid, cls.NETID_FWMASK, |
| table, cls.PRIORITY_FWMARK) |
| |
| # Configure IP addresses. |
| addr = cls._GetLocalAddress(version, tunnel.netid) |
| prefixlen = net_test.AddressLengthBits(version) |
| tunnel.addrs[version] = addr |
| if is_add: |
| cls.iproute.AddAddress(addr, prefixlen, ifindex) |
| cls.iproute.AddRoute(version, table, "default", 0, None, ifindex) |
| else: |
| cls.iproute.DelRoute(version, table, "default", 0, None, ifindex) |
| cls.iproute.DelAddress(addr, prefixlen, ifindex) |
| |
| def assertReceivedPacket(self, tunnel, sa_info): |
| tunnel.rx += 1 |
| self.assertEqual((tunnel.rx, tunnel.tx), |
| self.iproute.GetRxTxPackets(tunnel.iface)) |
| sa_info.seq_num += 1 |
| |
| def assertSentPacket(self, tunnel, sa_info): |
| tunnel.tx += 1 |
| self.assertEqual((tunnel.rx, tunnel.tx), |
| self.iproute.GetRxTxPackets(tunnel.iface)) |
| sa_info.seq_num += 1 |
| |
| def _CheckTunnelInput(self, tunnel, inner_version, local_inner, remote_inner, |
| sa_info=None, expect_fail=False): |
| """Test null-crypt input path over an IPsec interface.""" |
| if sa_info is None: |
| sa_info = tunnel.in_sa |
| read_sock, local_port = _CreateReceiveSock(inner_version) |
| |
| input_pkt = _GetNullAuthCryptTunnelModePkt( |
| inner_version, remote_inner, tunnel.remote, _TEST_REMOTE_PORT, |
| local_inner, tunnel.local, local_port, sa_info.spi, sa_info.seq_num) |
| self.ReceivePacketOn(tunnel.underlying_netid, input_pkt) |
| |
| if expect_fail: |
| self.assertRaisesErrno(EAGAIN, read_sock.recv, 4096) |
| else: |
| # Verify that the packet data and src are correct |
| data, src = read_sock.recvfrom(4096) |
| self.assertReceivedPacket(tunnel, sa_info) |
| self.assertEqual(net_test.UDP_PAYLOAD, data) |
| self.assertEqual((remote_inner, _TEST_REMOTE_PORT), src[:2]) |
| |
| def _CheckTunnelOutput(self, tunnel, inner_version, local_inner, |
| remote_inner, sa_info=None): |
| """Test null-crypt output path over an IPsec interface.""" |
| if sa_info is None: |
| sa_info = tunnel.out_sa |
| local_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner, |
| _TEST_REMOTE_PORT) |
| |
| # Read a tunneled IP packet on the underlying (outbound) network |
| # verifying that it is an ESP packet. |
| pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, sa_info.spi, |
| sa_info.seq_num, None, tunnel.local, |
| tunnel.remote) |
| |
| # Get and update the IP headers on the inner payload so that we can do a simple |
| # comparison of byte data. Unfortunately, due to the scapy version this runs on, |
| # we cannot parse past the ESP header to the inner IP header, and thus have to |
| # workaround in this manner |
| if inner_version == 4: |
| ip_hdr_options = { |
| 'id': scapy.IP(str(pkt.payload)[8:]).id, |
| 'flags': scapy.IP(str(pkt.payload)[8:]).flags |
| } |
| else: |
| ip_hdr_options = {'fl': scapy.IPv6(str(pkt.payload)[8:]).fl} |
| |
| expected = _GetNullAuthCryptTunnelModePkt( |
| inner_version, local_inner, tunnel.local, local_port, remote_inner, |
| tunnel.remote, _TEST_REMOTE_PORT, sa_info.spi, sa_info.seq_num, |
| ip_hdr_options) |
| |
| # Check outer header manually (Avoids having to overwrite outer header's |
| # id, flags or flow label) |
| self.assertSentPacket(tunnel, sa_info) |
| self.assertEqual(expected.src, pkt.src) |
| self.assertEqual(expected.dst, pkt.dst) |
| self.assertEqual(len(expected), len(pkt)) |
| |
| # Check everything else |
| self.assertEqual(str(expected.payload), str(pkt.payload)) |
| |
| def _CheckTunnelEncryption(self, tunnel, inner_version, local_inner, |
| remote_inner): |
| """Test both input and output paths over an encrypted IPsec interface. |
| |
| This tests specifically makes sure that the both encryption and decryption |
| work together, as opposed to the _CheckTunnel(Input|Output) where the |
| input and output paths are tested separately, and using null encryption. |
| """ |
| src_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner, |
| _TEST_REMOTE_PORT) |
| |
| # Make sure it appeared on the underlying interface |
| pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, tunnel.out_sa.spi, |
| tunnel.out_sa.seq_num, None, tunnel.local, |
| tunnel.remote) |
| |
| # Check that packet is not sent in plaintext |
| self.assertTrue(str(net_test.UDP_PAYLOAD) not in str(pkt)) |
| |
| # Check src/dst |
| self.assertEqual(tunnel.local, pkt.src) |
| self.assertEqual(tunnel.remote, pkt.dst) |
| |
| # Check that the interface statistics recorded the outbound packet |
| self.assertSentPacket(tunnel, tunnel.out_sa) |
| |
| try: |
| # Swap the interface addresses to pretend we are the remote |
| self._SwapInterfaceAddress( |
| tunnel.iface, new_addr=remote_inner, old_addr=local_inner) |
| |
| # Swap the packet's IP headers and write it back to the underlying |
| # network. |
| pkt = TunTwister.TwistPacket(pkt) |
| read_sock, local_port = _CreateReceiveSock(inner_version, |
| _TEST_REMOTE_PORT) |
| self.ReceivePacketOn(tunnel.underlying_netid, pkt) |
| |
| # Verify that the packet data and src are correct |
| data, src = read_sock.recvfrom(4096) |
| self.assertEqual(net_test.UDP_PAYLOAD, data) |
| self.assertEqual((local_inner, src_port), src[:2]) |
| |
| # Check that the interface statistics recorded the inbound packet |
| self.assertReceivedPacket(tunnel, tunnel.in_sa) |
| finally: |
| # Swap the interface addresses to pretend we are the remote |
| self._SwapInterfaceAddress( |
| tunnel.iface, new_addr=local_inner, old_addr=remote_inner) |
| |
| def _CheckTunnelIcmp(self, tunnel, inner_version, local_inner, remote_inner, |
| sa_info=None): |
| """Test ICMP error path over an IPsec interface.""" |
| if sa_info is None: |
| sa_info = tunnel.out_sa |
| # Now attempt to provoke an ICMP error. |
| # TODO: deduplicate with multinetwork_test.py. |
| dst_prefix, intermediate = { |
| 4: ("172.19.", "172.16.9.12"), |
| 6: ("2001:db8::", "2001:db8::1") |
| }[tunnel.version] |
| |
| local_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner, |
| _TEST_REMOTE_PORT) |
| pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, sa_info.spi, |
| sa_info.seq_num, None, tunnel.local, |
| tunnel.remote) |
| self.assertSentPacket(tunnel, sa_info) |
| |
| myaddr = self.MyAddress(tunnel.version, tunnel.underlying_netid) |
| _, toobig = packets.ICMPPacketTooBig(tunnel.version, intermediate, myaddr, |
| pkt) |
| self.ReceivePacketOn(tunnel.underlying_netid, toobig) |
| |
| # Check that the packet too big reduced the MTU. |
| routes = self.iproute.GetRoutes(tunnel.remote, 0, tunnel.underlying_netid, None) |
| self.assertEqual(1, len(routes)) |
| rtmsg, attributes = routes[0] |
| self.assertEqual(iproute.RTN_UNICAST, rtmsg.type) |
| self.assertEqual(packets.PTB_MTU, attributes["RTA_METRICS"]["RTAX_MTU"]) |
| |
| # Clear PMTU information so that future tests don't have to worry about it. |
| self.InvalidateDstCache(tunnel.version, tunnel.underlying_netid) |
| |
| def _CheckTunnelEncryptionWithIcmp(self, tunnel, inner_version, local_inner, |
| remote_inner): |
| """Test combined encryption path with ICMP errors over an IPsec tunnel""" |
| self._CheckTunnelEncryption(tunnel, inner_version, local_inner, |
| remote_inner) |
| self._CheckTunnelIcmp(tunnel, inner_version, local_inner, remote_inner) |
| self._CheckTunnelEncryption(tunnel, inner_version, local_inner, |
| remote_inner) |
| |
| def _TestTunnel(self, inner_version, outer_version, func, use_null_crypt): |
| """Bootstrap method to setup and run tests for the given parameters.""" |
| tunnel = self.randomTunnel(outer_version) |
| |
| try: |
| # Some tests require that the out_seq_num and in_seq_num are the same |
| # (Specifically encrypted tests), rebuild SAs to ensure seq_num is 1 |
| # |
| # Until we get better scapy support, the only way we can build an |
| # encrypted packet is to send it out, and read the packet from the wire. |
| # We then generally use this as the "inbound" encrypted packet, injecting |
| # it into the interface for which it is expected on. |
| # |
| # As such, this is required to ensure that encrypted packets (which we |
| # currently have no way to easily modify) are not considered replay |
| # attacks by the inbound SA. (eg: received 3 packets, seq_num_in = 3, |
| # sent only 1, # seq_num_out = 1, inbound SA would consider this a replay |
| # attack) |
| tunnel.TeardownXfrm() |
| tunnel.SetupXfrm(use_null_crypt) |
| |
| local_inner = tunnel.addrs[inner_version] |
| remote_inner = _GetRemoteInnerAddress(inner_version) |
| |
| for i in range(2): |
| func(tunnel, inner_version, local_inner, remote_inner) |
| finally: |
| if use_null_crypt: |
| tunnel.TeardownXfrm() |
| tunnel.SetupXfrm(False) |
| |
| def _CheckTunnelRekey(self, tunnel, inner_version, local_inner, remote_inner): |
| old_out_sa = tunnel.out_sa |
| old_in_sa = tunnel.in_sa |
| |
| # Check to make sure that both directions work before rekey |
| self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner, |
| old_in_sa) |
| self._CheckTunnelOutput(tunnel, inner_version, local_inner, remote_inner, |
| old_out_sa) |
| |
| # Rekey |
| outer_family = net_test.GetAddressFamily(tunnel.version) |
| |
| # Create new SA |
| # Distinguish the new SAs with new SPIs. |
| new_out_sa = SaInfo(old_out_sa.spi + 1) |
| new_in_sa = SaInfo(old_in_sa.spi + 1) |
| |
| # Perform Rekey |
| tunnel.Rekey(outer_family, new_out_sa, new_in_sa) |
| |
| # Expect that the old SPI still works for inbound packets |
| self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner, |
| old_in_sa) |
| |
| # Test both paths with new SPIs, expect outbound to use new SPI |
| self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner, |
| new_in_sa) |
| self._CheckTunnelOutput(tunnel, inner_version, local_inner, remote_inner, |
| new_out_sa) |
| |
| # Delete old SAs |
| tunnel.DeleteOldSaInfo(outer_family, old_in_sa.spi, old_out_sa.spi) |
| |
| # Test both paths with new SPIs; should still work |
| self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner, |
| new_in_sa) |
| self._CheckTunnelOutput(tunnel, inner_version, local_inner, remote_inner, |
| new_out_sa) |
| |
| # Expect failure upon trying to receive a packet with the deleted SPI |
| self._CheckTunnelInput(tunnel, inner_version, local_inner, remote_inner, |
| old_in_sa, True) |
| |
| def _TestTunnelRekey(self, inner_version, outer_version): |
| """Test packet input and output over a Virtual Tunnel Interface.""" |
| tunnel = self.randomTunnel(outer_version) |
| |
| try: |
| # Always use null_crypt, so we can check input and output separately |
| tunnel.TeardownXfrm() |
| tunnel.SetupXfrm(True) |
| |
| local_inner = tunnel.addrs[inner_version] |
| remote_inner = _GetRemoteInnerAddress(inner_version) |
| |
| self._CheckTunnelRekey(tunnel, inner_version, local_inner, remote_inner) |
| finally: |
| tunnel.TeardownXfrm() |
| tunnel.SetupXfrm(False) |
| |
| |
| @unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported") |
| class XfrmVtiTest(XfrmTunnelBase): |
| |
| INTERFACE_CLASS = VtiInterface |
| |
| def ParamTestVtiInput(self, inner_version, outer_version): |
| self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, True) |
| |
| def ParamTestVtiOutput(self, inner_version, outer_version): |
| self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput, |
| True) |
| |
| def ParamTestVtiInOutEncrypted(self, inner_version, outer_version): |
| self._TestTunnel(inner_version, outer_version, self._CheckTunnelEncryption, |
| False) |
| |
| def ParamTestVtiIcmp(self, inner_version, outer_version): |
| self._TestTunnel(inner_version, outer_version, self._CheckTunnelIcmp, False) |
| |
| def ParamTestVtiEncryptionWithIcmp(self, inner_version, outer_version): |
| self._TestTunnel(inner_version, outer_version, |
| self._CheckTunnelEncryptionWithIcmp, False) |
| |
| def ParamTestVtiRekey(self, inner_version, outer_version): |
| self._TestTunnelRekey(inner_version, outer_version) |
| |
| |
| @unittest.skipUnless(HAVE_XFRM_INTERFACES, "XFRM interfaces unsupported") |
| class XfrmInterfaceTest(XfrmTunnelBase): |
| |
| INTERFACE_CLASS = XfrmInterface |
| |
| def ParamTestXfrmIntfInput(self, inner_version, outer_version): |
| self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, True) |
| |
| def ParamTestXfrmIntfOutput(self, inner_version, outer_version): |
| self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput, |
| True) |
| |
| def ParamTestXfrmIntfInOutEncrypted(self, inner_version, outer_version): |
| self._TestTunnel(inner_version, outer_version, self._CheckTunnelEncryption, |
| False) |
| |
| def ParamTestXfrmIntfIcmp(self, inner_version, outer_version): |
| self._TestTunnel(inner_version, outer_version, self._CheckTunnelIcmp, False) |
| |
| def ParamTestXfrmIntfEncryptionWithIcmp(self, inner_version, outer_version): |
| self._TestTunnel(inner_version, outer_version, |
| self._CheckTunnelEncryptionWithIcmp, False) |
| |
| def ParamTestXfrmIntfRekey(self, inner_version, outer_version): |
| self._TestTunnelRekey(inner_version, outer_version) |
| |
| @unittest.skipUnless(SUPPORTS_XFRM_MIGRATE, "XFRM migration unsupported") |
| class XfrmInterfaceMigrateTest(XfrmTunnelBase): |
| # TODO: b/172497215 There is a kernel issue that XFRM_MIGRATE cannot work correctly |
| # when there are multiple tunnels with the same selectors. Thus before this issue |
| # is fixed, #allowMultipleTunnels must be overridden to avoid setting up multiple |
| # tunnels. This need to be removed after the kernel issue is fixed. |
| @classmethod |
| def allowMultipleTunnels(cls): |
| return False |
| |
| def setUpTunnel(self, outer_version, use_null_crypt): |
| underlying_netid = self.RandomNetid() |
| netid = _BASE_TUNNEL_NETID[outer_version] + _TUNNEL_NETID_OFFSET |
| iface = "ipsec%s" % netid |
| ifindex = self.ifindices[underlying_netid] |
| |
| local = self.MyAddress(outer_version, underlying_netid) |
| remote = net_test.IPV4_ADDR if outer_version == 4 else net_test.IPV6_ADDR |
| |
| tunnel = XfrmInterface(iface, netid, underlying_netid, ifindex, |
| local, remote, outer_version, use_null_crypt) |
| self._SetInboundMarking(netid, iface, True) |
| self._SetupTunnelNetwork(tunnel, True) |
| |
| return tunnel |
| |
| def tearDownTunnel(self, tunnel): |
| self._SetInboundMarking(tunnel.netid, tunnel.iface, False) |
| self._SetupTunnelNetwork(tunnel, False) |
| tunnel.Teardown() |
| |
| def _TestTunnel(self, inner_version, outer_version, func, use_null_crypt): |
| try: |
| tunnel = self.setUpTunnel(outer_version, use_null_crypt) |
| |
| # Verify functionality before migration |
| local_inner = tunnel.addrs[inner_version] |
| remote_inner = _GetRemoteInnerAddress(inner_version) |
| func(tunnel, inner_version, local_inner, remote_inner) |
| |
| # Migrate tunnel |
| # TODO:b/169170981 Add tests that migrate 4 -> 6 and 6 -> 4 |
| new_underlying_netid = self.RandomNetid(exclude=tunnel.underlying_netid) |
| new_local = self.MyAddress(outer_version, new_underlying_netid) |
| new_remote = net_test.IPV4_ADDR2 if outer_version == 4 else net_test.IPV6_ADDR2 |
| |
| tunnel.Migrate(new_underlying_netid, new_local, new_remote) |
| |
| # Verify functionality after migration |
| func(tunnel, inner_version, local_inner, remote_inner) |
| finally: |
| self.tearDownTunnel(tunnel) |
| |
| def ParamTestMigrateXfrmIntfInput(self, inner_version, outer_version): |
| self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, True) |
| |
| def ParamTestMigrateXfrmIntfOutput(self, inner_version, outer_version): |
| self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput, |
| True) |
| |
| def ParamTestMigrateXfrmIntfInOutEncrypted(self, inner_version, outer_version): |
| self._TestTunnel(inner_version, outer_version, self._CheckTunnelEncryption, |
| False) |
| |
| def ParamTestMigrateXfrmIntfIcmp(self, inner_version, outer_version): |
| self._TestTunnel(inner_version, outer_version, self._CheckTunnelIcmp, False) |
| |
| def ParamTestMigrateXfrmIntfEncryptionWithIcmp(self, inner_version, outer_version): |
| self._TestTunnel(inner_version, outer_version, |
| self._CheckTunnelEncryptionWithIcmp, False) |
| |
| def ParamTestMigrateXfrmIntfRekey(self, inner_version, outer_version): |
| self._TestTunnel(inner_version, outer_version, self._CheckTunnelRekey, |
| True) |
| |
| if __name__ == "__main__": |
| InjectTests() |
| unittest.main() |