Refactor VTI tests to support null encryption
-Split input and output into separate tests
-Refactor to inject null encrypted packets while still allowing
encrypted packets to be mirrored
-Add a check to ensure plaintext packets are not sent.
Bug: 66467511
Test: this
Change-Id: I6cad55f65d32fae9ea17fb44ea796bdcd4aba3a3
diff --git a/net/test/xfrm_tunnel_test.py b/net/test/xfrm_tunnel_test.py
index e71b46b..652a0c2 100755
--- a/net/test/xfrm_tunnel_test.py
+++ b/net/test/xfrm_tunnel_test.py
@@ -56,20 +56,22 @@
HAVE_XFRM_INTERFACES = HaveXfrmInterfaces()
-# Parameters to Set up VTI as a special network
-_BASE_VTI_NETID = {4: 40, 6: 60}
+# Parameters to setup tunnels as special networks
+_TUNNEL_NETID_OFFSET = 0xFC00 # Matches reserved netid range for IpSecService
+_BASE_TUNNEL_NETID = {4: 40, 6: 60}
_BASE_VTI_OKEY = 2000000100
_BASE_VTI_IKEY = 2000000200
-_VTI_NETID = 50
-_VTI_IFNAME = "test_vti"
-
_TEST_OUT_SPI = 0x1234
_TEST_IN_SPI = _TEST_OUT_SPI
_TEST_OKEY = 2000000100
_TEST_IKEY = 2000000200
+_TEST_REMOTE_PORT = 1234
+
+_SCAPY_IP_TYPE = {4: scapy.IP, 6: scapy.IPv6}
+
def _GetLocalInnerAddress(version):
return {4: "10.16.5.15", 6: "2001:db8:1::1"}[version]
@@ -83,8 +85,50 @@
return {4: net_test.IPV4_ADDR, 6: net_test.IPV6_ADDR}[version]
+def _GetNullAuthCryptTunnelModePkt(inner_version, src_inner, src_outer,
+ src_port, dst_inner, dst_outer,
+ dst_port, spi, seq_num, ip_hdr_options={}):
+ ip_hdr_options.update({'src': src_inner, 'dst': dst_inner})
+
+ # Build and receive an ESP packet destined for the inner socket
+ IpType = {4: scapy.IP, 6: scapy.IPv6}[inner_version]
+ input_pkt = (
+ IpType(**ip_hdr_options) / scapy.UDP(sport=src_port, dport=dst_port) /
+ net_test.UDP_PAYLOAD)
+ input_pkt = IpType(str(input_pkt)) # Compute length, checksum.
+ input_pkt = xfrm_base.EncryptPacketWithNull(input_pkt, spi, seq_num,
+ (src_outer, dst_outer))
+
+ return input_pkt
+
+
+def _CreateReceiveSock(version, port=0):
+ # Create a socket to receive packets.
+ read_sock = socket(net_test.GetAddressFamily(version), SOCK_DGRAM, 0)
+ read_sock.bind((net_test.GetWildcardAddress(version), port))
+ # The second parameter of the tuple is the port number regardless of AF.
+ local_port = read_sock.getsockname()[1]
+ # Guard against the eventuality of the receive failing.
+ net_test.SetNonBlocking(read_sock.fileno())
+
+ return read_sock, local_port
+
+
+def _SendPacket(testInstance, netid, version, remote, remote_port):
+ # Send a packet out via the tunnel-backed network, bound for the port number
+ # of the input socket.
+ write_sock = socket(net_test.GetAddressFamily(version), SOCK_DGRAM, 0)
+ testInstance.SelectInterface(write_sock, netid, "mark")
+ write_sock.sendto(net_test.UDP_PAYLOAD, (remote, remote_port))
+ local_port = write_sock.getsockname()[1]
+
+ return local_port
+
+
def InjectTests():
InjectParameterizedTests(XfrmTunnelTest)
+ InjectParameterizedTests(XfrmInterfaceTest)
+ InjectParameterizedTests(XfrmVtiTest)
def InjectParameterizedTests(cls):
@@ -115,20 +159,15 @@
local_port = read_sock.getsockname()[1]
# Build and receive an ESP packet destined for the inner socket
- IpType = {4: scapy.IP, 6: scapy.IPv6}[inner_version]
- input_pkt = (
- IpType(src=remote_inner, dst=local_inner) / scapy.UDP(
- sport=1234, dport=local_port) / net_test.UDP_PAYLOAD)
- input_pkt = IpType(str(input_pkt)) # Compute length, checksum.
- input_pkt = xfrm_base.EncryptPacketWithNull(input_pkt, _TEST_IN_SPI, 1,
- (remote_outer, local_outer))
+ input_pkt = _GetNullAuthCryptTunnelModePkt(
+ inner_version, remote_inner, remote_outer, _TEST_REMOTE_PORT,
+ local_inner, local_outer, local_port, _TEST_IN_SPI, 1)
self.ReceivePacketOn(underlying_netid, input_pkt)
# Verify that the packet data and src are correct
data, src = read_sock.recvfrom(4096)
self.assertEquals(net_test.UDP_PAYLOAD, data)
- self.assertEquals(remote_inner, src[0])
- self.assertEquals(1234, src[1])
+ self.assertEquals((remote_inner, _TEST_REMOTE_PORT), src[:2])
def _TestTunnel(self, inner_version, outer_version, func, direction):
"""Test a unidirectional XFRM Tunnel with explicit selectors"""
@@ -160,10 +199,7 @@
write_sock = socket(net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0)
self.SelectInterface(write_sock, netid, "mark")
- read_sock = socket(net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0)
- read_sock.bind((net_test.GetWildcardAddress(inner_version), 0))
- # Guard against the eventuality of the receive failing.
- net_test.SetNonBlocking(read_sock.fileno())
+ read_sock, _ = _CreateReceiveSock(inner_version)
sock = write_sock if direction == xfrm.XFRM_POLICY_OUT else read_sock
func(inner_version, outer_version, u_netid, netid, local_inner,
@@ -197,38 +233,37 @@
netid = self.RandomNetid()
local_addr = self.MyAddress(version, netid)
self.iproute.CreateVirtualTunnelInterface(
- dev_name=_VTI_IFNAME,
+ dev_name=_TEST_XFRM_IFNAME,
local_addr=local_addr,
remote_addr=_GetRemoteOuterAddress(version),
o_key=_TEST_OKEY,
i_key=_TEST_IKEY)
- self._VerifyVtiInfoData(self.iproute.GetIfinfoData(_VTI_IFNAME),
- version, local_addr,
- _GetRemoteOuterAddress(version),
- _TEST_IKEY, _TEST_OKEY)
+ self._VerifyVtiInfoData(
+ self.iproute.GetIfinfoData(_TEST_XFRM_IFNAME), version, local_addr,
+ _GetRemoteOuterAddress(version), _TEST_IKEY, _TEST_OKEY)
new_remote_addr = {4: net_test.IPV4_ADDR2, 6: net_test.IPV6_ADDR2}
- new_okey = _TEST_OKEY + _VTI_NETID
- new_ikey = _TEST_IKEY + _VTI_NETID
+ new_okey = _TEST_OKEY + _TEST_XFRM_IF_ID
+ new_ikey = _TEST_IKEY + _TEST_XFRM_IF_ID
self.iproute.CreateVirtualTunnelInterface(
- dev_name=_VTI_IFNAME,
+ dev_name=_TEST_XFRM_IFNAME,
local_addr=local_addr,
remote_addr=new_remote_addr[version],
o_key=new_okey,
i_key=new_ikey,
is_update=True)
- self._VerifyVtiInfoData(self.iproute.GetIfinfoData(_VTI_IFNAME),
- version, local_addr, new_remote_addr[version],
- new_ikey, new_okey)
+ self._VerifyVtiInfoData(
+ self.iproute.GetIfinfoData(_TEST_XFRM_IFNAME), version, local_addr,
+ new_remote_addr[version], new_ikey, new_okey)
- if_index = self.iproute.GetIfIndex(_VTI_IFNAME)
+ if_index = self.iproute.GetIfIndex(_TEST_XFRM_IFNAME)
# Validate that the netlink interface matches the ioctl interface.
- self.assertEquals(net_test.GetInterfaceIndex(_VTI_IFNAME), if_index)
- self.iproute.DeleteLink(_VTI_IFNAME)
+ self.assertEquals(net_test.GetInterfaceIndex(_TEST_XFRM_IFNAME), if_index)
+ self.iproute.DeleteLink(_TEST_XFRM_IFNAME)
with self.assertRaises(IOError):
- self.iproute.GetIfIndex(_VTI_IFNAME)
+ self.iproute.GetIfIndex(_TEST_XFRM_IFNAME)
def _QuietDeleteLink(self, ifname):
try:
@@ -239,59 +274,94 @@
def tearDown(self):
super(XfrmAddDeleteVtiTest, self).tearDown()
- self._QuietDeleteLink(_VTI_IFNAME)
+ self._QuietDeleteLink(_TEST_XFRM_IFNAME)
-class VtiInterface(object):
+class SaInfo(object):
- def __init__(self, iface, netid, underlying_netid, _, local, remote):
+ def __init__(self, spi):
+ self.spi = spi
+ self.seq_num = 1
+
+
+class IpSecBaseInterface(object):
+
+ def __init__(self, iface, netid, underlying_netid, local, remote, version):
self.iface = iface
self.netid = netid
self.underlying_netid = underlying_netid
self.local, self.remote = local, remote
+
+ # XFRM interfaces technically do not have a version. This keeps track of
+ # the IP version of the local and remote addresses.
+ self.version = version
self.rx = self.tx = 0
- self.ikey = _TEST_IKEY + netid
- self.okey = _TEST_OKEY + netid
- self.out_spi = self.in_spi = random.randint(0, 0x7fffffff)
+ self.addrs = {}
self.iproute = iproute.IPRoute()
self.xfrm = xfrm.Xfrm()
- self.SetupInterface()
- self.SetupXfrm()
- self.addrs = {}
-
def Teardown(self):
self.TeardownXfrm()
self.TeardownInterface()
+ def TeardownInterface(self):
+ self.iproute.DeleteLink(self.iface)
+
+ def SetupXfrm(self, use_null_crypt):
+ rand_spi = random.randint(0, 0x7fffffff)
+ self.in_sa = SaInfo(rand_spi)
+ self.out_sa = SaInfo(rand_spi)
+
+ # Select algorithms:
+ if use_null_crypt:
+ auth, crypt = xfrm_base._ALGO_AUTH_NULL, xfrm_base._ALGO_CRYPT_NULL
+ else:
+ auth, crypt = xfrm_base._ALGO_HMAC_SHA1, xfrm_base._ALGO_CBC_AES_256
+
+ self._SetupXfrmByType(auth, crypt)
+
+ def TeardownXfrm(self):
+ raise NotImplementedError("Subclasses should implement this")
+
+ def _SetupXfrmByType(self, auth_algo, crypt_algo):
+ raise NotImplementedError("Subclasses should implement this")
+
+
+class VtiInterface(IpSecBaseInterface):
+
+ def __init__(self, iface, netid, underlying_netid, _, local, remote, version):
+ super(VtiInterface, self).__init__(iface, netid, underlying_netid, local,
+ remote, version)
+
+ self.ikey = _TEST_IKEY + netid
+ self.okey = _TEST_OKEY + netid
+
+ self.SetupInterface()
+ self.SetupXfrm(False)
+
def SetupInterface(self):
return self.iproute.CreateVirtualTunnelInterface(
self.iface, self.local, self.remote, self.ikey, self.okey)
- def TeardownInterface(self):
- self.iproute.DeleteLink(self.iface)
-
- def SetupXfrm(self):
+ def _SetupXfrmByType(self, auth_algo, crypt_algo):
# For the VTI, the selectors are wildcard since packets will only
# be selected if they have the appropriate mark, hence the inner
# addresses are wildcard.
self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote,
- self.out_spi, xfrm_base._ALGO_CBC_AES_256,
- xfrm_base._ALGO_HMAC_SHA1,
+ self.out_sa.spi, crypt_algo, auth_algo,
xfrm.ExactMatchMark(self.okey),
self.underlying_netid, None)
self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local,
- self.in_spi, xfrm_base._ALGO_CBC_AES_256,
- xfrm_base._ALGO_HMAC_SHA1,
+ self.in_sa.spi, crypt_algo, auth_algo,
xfrm.ExactMatchMark(self.ikey), None, None)
def TeardownXfrm(self):
self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote,
- self.out_spi, self.okey, None)
+ self.out_sa.spi, self.okey, None)
self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local,
- self.in_spi, self.ikey, None)
+ self.in_sa.spi, self.ikey, None)
@unittest.skipUnless(HAVE_XFRM_INTERFACES, "XFRM interfaces unsupported")
@@ -311,53 +381,36 @@
self.iproute.GetIfIndex(_TEST_XFRM_IFNAME)
-class XfrmInterface(object):
+class XfrmInterface(IpSecBaseInterface):
- def __init__(self, iface, netid, underlying_netid, ifindex, local, remote):
- self.iface = iface
- self.netid = netid
- self.underlying_netid = underlying_netid
+ def __init__(self, iface, netid, underlying_netid, ifindex, local, remote,
+ version):
+ super(XfrmInterface, self).__init__(iface, netid, underlying_netid, local,
+ remote, version)
+
self.ifindex = ifindex
- self.local, self.remote = local, remote
- self.rx = self.tx = 0
self.xfrm_if_id = netid
- self.out_spi = self.in_spi = random.randint(0, 0x7fffffff)
- self.xfrm_if_id = self.netid
-
- self.iproute = iproute.IPRoute()
- self.xfrm = xfrm.Xfrm()
self.SetupInterface()
- self.SetupXfrm()
- self.addrs = {}
-
- def Teardown(self):
- self.TeardownXfrm()
- self.TeardownInterface()
+ self.SetupXfrm(False)
def SetupInterface(self):
"""Create an XFRM interface."""
return self.iproute.CreateXfrmInterface(self.iface, self.netid, self.ifindex)
- def TeardownInterface(self):
- self.iproute.DeleteLink(self.iface)
-
- def SetupXfrm(self):
+ def _SetupXfrmByType(self, auth_algo, crypt_algo):
self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_OUT, None, self.local, self.remote,
- self.out_spi, xfrm_base._ALGO_CBC_AES_256,
- xfrm_base._ALGO_HMAC_SHA1, None,
+ self.out_sa.spi, crypt_algo, auth_algo, None,
self.underlying_netid, self.xfrm_if_id)
self.xfrm.CreateTunnel(xfrm.XFRM_POLICY_IN, None, self.remote, self.local,
- self.in_spi, xfrm_base._ALGO_CBC_AES_256,
- xfrm_base._ALGO_HMAC_SHA1,
- None, None, self.xfrm_if_id)
-
+ self.in_sa.spi, crypt_algo, auth_algo, None, None,
+ self.xfrm_if_id)
def TeardownXfrm(self):
self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_OUT, None, self.remote,
- self.out_spi, None, self.xfrm_if_id)
+ self.out_sa.spi, None, self.xfrm_if_id)
self.xfrm.DeleteTunnel(xfrm.XFRM_POLICY_IN, None, self.local,
- self.in_spi, None, self.xfrm_if_id)
+ self.in_sa.spi, None, self.xfrm_if_id)
@@ -366,43 +419,54 @@
@classmethod
def setUpClass(cls):
xfrm_base.XfrmBaseTest.setUpClass()
- # VTI interfaces use marks extensively, so configure realistic packet
+ # Tunnel interfaces use marks extensively, so configure realistic packet
# marking rules to make the test representative, make PMTUD work, etc.
cls.SetInboundMarks(True)
cls.SetMarkReflectSysctls(1)
- cls.tunnels = {}
+ # Group by tunnel version to ensure that we test at least one IPv4 and one
+ # IPv6 tunnel
+ cls.tunnelsV4 = {}
+ cls.tunnelsV6 = {}
for i, underlying_netid in enumerate(cls.tuns):
for version in 4, 6:
- netid = _BASE_VTI_NETID[version] + i
+ netid = _BASE_TUNNEL_NETID[version] + _TUNNEL_NETID_OFFSET + i
iface = "ipsec%s" % netid
local = cls.MyAddress(version, underlying_netid)
if version == 4:
- remote = net_test.IPV4_ADDR2 if (i % 2) else net_test.IPV4_ADDR
+ remote = (net_test.IPV4_ADDR if (i % 2) else net_test.IPV4_ADDR2)
else:
- remote = net_test.IPV6_ADDR2 if (i % 2) else net_test.IPV6_ADDR
+ remote = (net_test.IPV6_ADDR if (i % 2) else net_test.IPV6_ADDR2)
+
ifindex = cls.ifindices[underlying_netid]
-
tunnel = cls.INTERFACE_CLASS(iface, netid, underlying_netid, ifindex,
- local, remote)
-
+ local, remote, version)
cls._SetInboundMarking(netid, iface, True)
cls._SetupTunnelNetwork(tunnel, True)
- cls.tunnels[netid] = tunnel
+
+ if version == 4:
+ cls.tunnelsV4[netid] = tunnel
+ else:
+ cls.tunnelsV6[netid] = tunnel
@classmethod
def tearDownClass(cls):
# The sysctls are restored by MultinetworkBaseTest.tearDownClass.
cls.SetInboundMarks(False)
- for tunnel in cls.tunnels.values():
+ for tunnel in cls.tunnelsV4.values() + cls.tunnelsV6.values():
cls._SetInboundMarking(tunnel.netid, tunnel.iface, False)
cls._SetupTunnelNetwork(tunnel, False)
tunnel.Teardown()
xfrm_base.XfrmBaseTest.tearDownClass()
+ def randomTunnel(self, outer_version):
+ version_dict = self.tunnelsV4 if outer_version == 4 else self.tunnelsV6
+ return random.choice(version_dict.values())
+
def setUp(self):
multinetwork_base.MultiNetworkBaseTest.setUp(self)
self.iproute = iproute.IPRoute()
+ self.xfrm = xfrm.Xfrm()
def tearDown(self):
multinetwork_base.MultiNetworkBaseTest.tearDown(self)
@@ -423,6 +487,13 @@
net_test.AddressLengthBits(version), ifindex)
@classmethod
+ def _GetLocalAddress(cls, version, netid):
+ if version == 4:
+ return cls._MyIPv4Address(netid - _TUNNEL_NETID_OFFSET)
+ else:
+ return cls.OnlinkPrefix(6, netid - _TUNNEL_NETID_OFFSET) + "1"
+
+ @classmethod
def _SetupTunnelNetwork(cls, tunnel, is_add):
"""Setup rules and routes for a tunnel Network.
@@ -461,10 +532,7 @@
table, cls.PRIORITY_FWMARK)
# Configure IP addresses.
- if version == 4:
- addr = cls._MyIPv4Address(tunnel.netid)
- else:
- addr = cls.OnlinkPrefix(6, tunnel.netid) + "1"
+ addr = cls._GetLocalAddress(version, tunnel.netid)
prefixlen = net_test.AddressLengthBits(version)
tunnel.addrs[version] = addr
if is_add:
@@ -474,83 +542,149 @@
cls.iproute.DelRoute(version, table, "default", 0, None, ifindex)
cls.iproute.DelAddress(addr, prefixlen, ifindex)
- def assertReceivedPacket(self, tunnel):
+ def assertReceivedPacket(self, tunnel, sa_info):
tunnel.rx += 1
self.assertEquals((tunnel.rx, tunnel.tx),
self.iproute.GetRxTxPackets(tunnel.iface))
+ sa_info.seq_num += 1
- def assertSentPacket(self, tunnel):
+ def assertSentPacket(self, tunnel, sa_info):
tunnel.tx += 1
self.assertEquals((tunnel.rx, tunnel.tx),
self.iproute.GetRxTxPackets(tunnel.iface))
+ sa_info.seq_num += 1
- # TODO: Should we completely re-write this using null encryption and null
- # authentication? We could then assemble and disassemble packets for each
- # direction individually. This approach would improve debuggability, avoid the
- # complexity of the twister, and allow the test to more-closely validate
- # deployable configurations.
- def _CheckTunnelInputOutput(self, tunnel, inner_version):
- local_outer = tunnel.local
- remote_outer = tunnel.remote
+ def _CheckTunnelInput(self, tunnel, inner_version, local_inner, remote_inner,
+ sa_info=None):
+ """Test null-crypt input path over an IPsec interface."""
+ if sa_info is None:
+ sa_info = tunnel.in_sa
+ read_sock, local_port = _CreateReceiveSock(inner_version)
- # Create a socket to receive packets.
- read_sock = socket(
- net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0)
- read_sock.bind((net_test.GetWildcardAddress(inner_version), 0))
- # The second parameter of the tuple is the port number regardless of AF.
- port = read_sock.getsockname()[1]
- # Guard against the eventuality of the receive failing.
- net_test.SetNonBlocking(read_sock.fileno())
+ input_pkt = _GetNullAuthCryptTunnelModePkt(
+ inner_version, remote_inner, tunnel.remote, _TEST_REMOTE_PORT,
+ local_inner, tunnel.local, local_port, sa_info.spi, sa_info.seq_num)
+ self.ReceivePacketOn(tunnel.underlying_netid, input_pkt)
- # Send a packet out via the tunnel-backed network, bound for the port number
- # of the input socket.
- write_sock = socket(
- net_test.GetAddressFamily(inner_version), SOCK_DGRAM, 0)
- self.SelectInterface(write_sock, tunnel.netid, "mark")
- write_sock.sendto(net_test.UDP_PAYLOAD,
- (_GetRemoteInnerAddress(inner_version), port))
+ # Verify that the packet data and src are correct
+ self.assertReceivedPacket(tunnel, sa_info)
+ data, src = read_sock.recvfrom(4096)
+ self.assertEquals(net_test.UDP_PAYLOAD, data)
+ self.assertEquals((remote_inner, _TEST_REMOTE_PORT), src[:2])
+
+ def _CheckTunnelOutput(self, tunnel, inner_version, local_inner,
+ remote_inner, sa_info=None):
+ """Test null-crypt output path over an IPsec interface."""
+ if sa_info is None:
+ sa_info = tunnel.out_sa
+ local_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner,
+ _TEST_REMOTE_PORT)
# Read a tunneled IP packet on the underlying (outbound) network
# verifying that it is an ESP packet.
- self.assertSentPacket(tunnel)
- pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, tunnel.out_spi, tunnel.tx, None,
- local_outer, remote_outer)
+ pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, sa_info.spi,
+ sa_info.seq_num, None, tunnel.local,
+ tunnel.remote)
- # Perform an address switcheroo so that the inner address of the remote
- # end of the tunnel is now the address on the local tunnel interface; this
- # way, the twisted inner packet finds a destination via the tunnel once
- # decrypted.
- remote = _GetRemoteInnerAddress(inner_version)
- local = tunnel.addrs[inner_version]
- self._SwapInterfaceAddress(tunnel.iface, new_addr=remote, old_addr=local)
+ # Get and update the IP headers on the inner payload so that we can do a simple
+ # comparison of byte data. Unfortunately, due to the scapy version this runs on,
+ # we cannot parse past the ESP header to the inner IP header, and thus have to
+ # workaround in this manner
+ if inner_version == 4:
+ ip_hdr_options = {
+ 'id': scapy.IP(str(pkt.payload)[8:]).id,
+ 'flags': scapy.IP(str(pkt.payload)[8:]).flags
+ }
+ else:
+ ip_hdr_options = {'fl': scapy.IPv6(str(pkt.payload)[8:]).fl}
+
+ expected = _GetNullAuthCryptTunnelModePkt(
+ inner_version, local_inner, tunnel.local, local_port, remote_inner,
+ tunnel.remote, _TEST_REMOTE_PORT, sa_info.spi, sa_info.seq_num,
+ ip_hdr_options)
+
+ # Check outer header manually (Avoids having to overwrite outer header's
+ # id, flags or flow label)
+ self.assertSentPacket(tunnel, sa_info)
+ self.assertEquals(expected.src, pkt.src)
+ self.assertEquals(expected.dst, pkt.dst)
+ self.assertEquals(len(expected), len(pkt))
+
+ # Check everything else
+ self.assertEquals(str(expected.payload), str(pkt.payload))
+
+ def _CheckTunnelEncryption(self, tunnel, inner_version, local_inner,
+ remote_inner):
+ """Test both input and output paths over an encrypted IPsec interface.
+
+ This tests specifically makes sure that the both encryption and decryption
+ work together, as opposed to the _CheckTunnel(Input|Output) where the
+ input and output paths are tested separately, and using null encryption.
+ """
+ src_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner,
+ _TEST_REMOTE_PORT)
+
+ # Make sure it appeared on the underlying interface
+ pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, tunnel.out_sa.spi,
+ tunnel.out_sa.seq_num, None, tunnel.local,
+ tunnel.remote)
+
+ # Check that packet is not sent in plaintext
+ self.assertTrue(str(net_test.UDP_PAYLOAD) not in str(pkt))
+
+ # Check src/dst
+ self.assertEquals(tunnel.local, pkt.src)
+ self.assertEquals(tunnel.remote, pkt.dst)
+
+ # Check that the interface statistics recorded the outbound packet
+ self.assertSentPacket(tunnel, tunnel.out_sa)
+
try:
- # Swap the packet's IP headers and write it back to the
- # underlying network.
- pkt = TunTwister.TwistPacket(pkt)
- self.ReceivePacketOn(tunnel.underlying_netid, pkt)
- self.assertReceivedPacket(tunnel)
- # Receive the decrypted packet on the dest port number.
- read_packet = read_sock.recv(4096)
- self.assertEquals(read_packet, net_test.UDP_PAYLOAD)
- finally:
- # Unwind the switcheroo
- self._SwapInterfaceAddress(tunnel.iface, new_addr=local, old_addr=remote)
+ # Swap the interface addresses to pretend we are the remote
+ self._SwapInterfaceAddress(
+ tunnel.iface, new_addr=remote_inner, old_addr=local_inner)
+ # Swap the packet's IP headers and write it back to the underlying
+ # network.
+ pkt = TunTwister.TwistPacket(pkt)
+ read_sock, local_port = _CreateReceiveSock(inner_version,
+ _TEST_REMOTE_PORT)
+ self.ReceivePacketOn(tunnel.underlying_netid, pkt)
+
+ # Verify that the packet data and src are correct
+ data, src = read_sock.recvfrom(4096)
+ self.assertEquals(net_test.UDP_PAYLOAD, data)
+ self.assertEquals((local_inner, src_port), src[:2])
+
+ # Check that the interface statistics recorded the inbound packet
+ self.assertReceivedPacket(tunnel, tunnel.in_sa)
+ finally:
+ # Swap the interface addresses to pretend we are the remote
+ self._SwapInterfaceAddress(
+ tunnel.iface, new_addr=local_inner, old_addr=remote_inner)
+
+ def _CheckTunnelIcmp(self, tunnel, inner_version, local_inner, remote_inner,
+ sa_info=None):
+ """Test ICMP error path over an IPsec interface."""
+ if sa_info is None:
+ sa_info = tunnel.out_sa
# Now attempt to provoke an ICMP error.
# TODO: deduplicate with multinetwork_test.py.
- version = net_test.GetAddressVersion(tunnel.remote)
dst_prefix, intermediate = {
4: ("172.19.", "172.16.9.12"),
6: ("2001:db8::", "2001:db8::1")
- }[version]
+ }[tunnel.version]
- write_sock.sendto(net_test.UDP_PAYLOAD,
- (_GetRemoteInnerAddress(inner_version), port))
- self.assertSentPacket(tunnel)
- pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, tunnel.out_spi, tunnel.tx, None,
- local_outer, remote_outer)
- myaddr = self.MyAddress(version, tunnel.underlying_netid)
- _, toobig = packets.ICMPPacketTooBig(version, intermediate, myaddr, pkt)
+ local_port = _SendPacket(self, tunnel.netid, inner_version, remote_inner,
+ _TEST_REMOTE_PORT)
+ pkt = self._ExpectEspPacketOn(tunnel.underlying_netid, sa_info.spi,
+ sa_info.seq_num, None, tunnel.local,
+ tunnel.remote)
+ self.assertSentPacket(tunnel, sa_info)
+
+ myaddr = self.MyAddress(tunnel.version, tunnel.underlying_netid)
+ _, toobig = packets.ICMPPacketTooBig(tunnel.version, intermediate, myaddr,
+ pkt)
self.ReceivePacketOn(tunnel.underlying_netid, toobig)
# Check that the packet too big reduced the MTU.
@@ -561,14 +695,35 @@
self.assertEquals(packets.PTB_MTU, attributes["RTA_METRICS"]["RTAX_MTU"])
# Clear PMTU information so that future tests don't have to worry about it.
- self.InvalidateDstCache(version, tunnel.underlying_netid)
+ self.InvalidateDstCache(tunnel.version, tunnel.underlying_netid)
- def CheckTunnelInputOutput(self):
- """Test packet input and output over a Virtual Tunnel Interface."""
- for i in xrange(3 * len(self.tunnels.values())):
- tunnel = random.choice(self.tunnels.values())
- self._CheckTunnelInputOutput(tunnel, 4)
- self._CheckTunnelInputOutput(tunnel, 6)
+ def _CheckTunnelEncryptionWithIcmp(self, tunnel, inner_version, local_inner,
+ remote_inner):
+ """Test combined encryption path with ICMP errors over an IPsec tunnel"""
+ self._CheckTunnelEncryption(tunnel, inner_version, local_inner,
+ remote_inner)
+ self._CheckTunnelIcmp(tunnel, inner_version, local_inner, remote_inner)
+ self._CheckTunnelEncryption(tunnel, inner_version, local_inner,
+ remote_inner)
+
+ def _TestTunnel(self, inner_version, outer_version, func, use_null_crypt):
+ """Bootstrap method to setup and run tests for the given parameters."""
+ tunnel = self.randomTunnel(outer_version)
+
+ try:
+ tunnel.TeardownXfrm()
+ tunnel.SetupXfrm(use_null_crypt)
+
+ local_inner = tunnel.addrs[inner_version]
+ remote_inner = _GetRemoteInnerAddress(inner_version)
+
+ # Run twice to ensure sequence numbers are tested
+ for i in range(2):
+ func(tunnel, inner_version, local_inner, remote_inner)
+ finally:
+ if use_null_crypt:
+ tunnel.TeardownXfrm()
+ tunnel.SetupXfrm(False)
@unittest.skipUnless(net_test.LINUX_VERSION >= (3, 18, 0), "VTI Unsupported")
@@ -576,8 +731,23 @@
INTERFACE_CLASS = VtiInterface
- def testVtiInputOutput(self):
- self.CheckTunnelInputOutput()
+ def ParamTestVtiInput(self, inner_version, outer_version):
+ self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, True)
+
+ def ParamTestVtiOutput(self, inner_version, outer_version):
+ self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput,
+ True)
+
+ def ParamTestVtiInOutEncrypted(self, inner_version, outer_version):
+ self._TestTunnel(inner_version, outer_version, self._CheckTunnelEncryption,
+ False)
+
+ def ParamTestVtiIcmp(self, inner_version, outer_version):
+ self._TestTunnel(inner_version, outer_version, self._CheckTunnelIcmp, False)
+
+ def ParamTestVtiEncryptionWithIcmp(self, inner_version, outer_version):
+ self._TestTunnel(inner_version, outer_version,
+ self._CheckTunnelEncryptionWithIcmp, False)
@unittest.skipUnless(HAVE_XFRM_INTERFACES, "XFRM interfaces unsupported")
@@ -585,8 +755,23 @@
INTERFACE_CLASS = XfrmInterface
- def testXfrmiInputOutput(self):
- self.CheckTunnelInputOutput()
+ def ParamTestXfrmIntfInput(self, inner_version, outer_version):
+ self._TestTunnel(inner_version, outer_version, self._CheckTunnelInput, True)
+
+ def ParamTestXfrmIntfOutput(self, inner_version, outer_version):
+ self._TestTunnel(inner_version, outer_version, self._CheckTunnelOutput,
+ True)
+
+ def ParamTestXfrmIntfInOutEncrypted(self, inner_version, outer_version):
+ self._TestTunnel(inner_version, outer_version, self._CheckTunnelEncryption,
+ False)
+
+ def ParamTestXfrmIntfIcmp(self, inner_version, outer_version):
+ self._TestTunnel(inner_version, outer_version, self._CheckTunnelIcmp, False)
+
+ def ParamTestXfrmIntfEncryptionWithIcmp(self, inner_version, outer_version):
+ self._TestTunnel(inner_version, outer_version,
+ self._CheckTunnelEncryptionWithIcmp, False)
if __name__ == "__main__":