blob: 3a3d9b0d36a904b1cc4a8539d7d6c40cbd20b8d9 [file] [log] [blame]
#!/usr/bin/python
#
# Copyright 2017 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=g-bad-todo,g-bad-file-header,wildcard-import
from errno import * # pylint: disable=wildcard-import
from scapy import all as scapy
from socket import * # pylint: disable=wildcard-import
import struct
import subprocess
import threading
import unittest
import csocket
import cstruct
import multinetwork_base
import net_test
import packets
import xfrm
import xfrm_base
ENCRYPTED_PAYLOAD = ("b1c74998efd6326faebe2061f00f2c750e90e76001664a80c287b150"
"59e74bf949769cc6af71e51b539e7de3a2a14cb05a231b969e035174"
"d98c5aa0cef1937db98889ec0d08fa408fecf616")
TEST_ADDR1 = "2001:4860:4860::8888"
TEST_ADDR2 = "2001:4860:4860::8844"
# IP addresses to use for tunnel endpoints. For generality, these should be
# different from the addresses we send packets to.
TUNNEL_ENDPOINTS = {4: "8.8.4.4", 6: TEST_ADDR2}
TEST_SPI = 0x1234
TEST_SPI2 = 0x1235
class XfrmFunctionalTest(xfrm_base.XfrmLazyTest):
def assertIsUdpEncapEsp(self, packet, spi, seq, length):
self.assertEquals(IPPROTO_UDP, packet.proto)
udp_hdr = packet[scapy.UDP]
self.assertEquals(4500, udp_hdr.dport)
self.assertEquals(length, len(udp_hdr))
esp_hdr, _ = cstruct.Read(str(udp_hdr.payload), xfrm.EspHdr)
# FIXME: this file currently swaps SPI byte order manually, so SPI needs to
# be double-swapped here.
self.assertEquals(xfrm.EspHdr((spi, seq)), esp_hdr)
def CreateNewSa(self, localAddr, remoteAddr, spi, reqId, encap_tmpl,
null_auth=False):
auth_algo = (
xfrm_base._ALGO_AUTH_NULL if null_auth else xfrm_base._ALGO_HMAC_SHA1)
self.xfrm.AddSaInfo(localAddr, remoteAddr, spi, xfrm.XFRM_MODE_TRANSPORT,
reqId, xfrm_base._ALGO_CBC_AES_256, auth_algo, None,
encap_tmpl, None, None)
def testAddSa(self):
self.CreateNewSa("::", TEST_ADDR1, TEST_SPI, 3320, None)
expected = (
"src :: dst 2001:4860:4860::8888\n"
"\tproto esp spi 0x00001234 reqid 3320 mode transport\n"
"\treplay-window 4 \n"
"\tauth-trunc hmac(sha1) 0x%s 96\n"
"\tenc cbc(aes) 0x%s\n"
"\tsel src ::/0 dst ::/0 \n" % (
xfrm_base._AUTHENTICATION_KEY_128.encode("hex"),
xfrm_base._ENCRYPTION_KEY_256.encode("hex")))
actual = subprocess.check_output("ip xfrm state".split())
# Newer versions of IP also show anti-replay context. Don't choke if it's
# missing.
actual = actual.replace(
"\tanti-replay context: seq 0x0, oseq 0x0, bitmap 0x00000000\n", "")
try:
self.assertMultiLineEqual(expected, actual)
finally:
self.xfrm.DeleteSaInfo(TEST_ADDR1, TEST_SPI, IPPROTO_ESP)
def testFlush(self):
self.assertEquals(0, len(self.xfrm.DumpSaInfo()))
self.CreateNewSa("::", "2000::", TEST_SPI, 1234, None)
self.CreateNewSa("0.0.0.0", "192.0.2.1", TEST_SPI, 4321, None)
self.assertEquals(2, len(self.xfrm.DumpSaInfo()))
self.xfrm.FlushSaInfo()
self.assertEquals(0, len(self.xfrm.DumpSaInfo()))
def _TestSocketPolicy(self, version):
# Open a UDP socket and connect it.
family = net_test.GetAddressFamily(version)
s = socket(family, SOCK_DGRAM, 0)
netid = self.RandomNetid()
self.SelectInterface(s, netid, "mark")
remotesockaddr = self.GetRemoteSocketAddress(version)
s.connect((remotesockaddr, 53))
saddr, sport = s.getsockname()[:2]
daddr, dport = s.getpeername()[:2]
if version == 5:
saddr = saddr.replace("::ffff:", "")
daddr = daddr.replace("::ffff:", "")
reqid = 0
desc, pkt = packets.UDP(version, saddr, daddr, sport=sport)
s.sendto(net_test.UDP_PAYLOAD, (remotesockaddr, 53))
self.ExpectPacketOn(netid, "Send after socket, expected %s" % desc, pkt)
# Using IPv4 XFRM on a dual-stack socket requires setting an AF_INET policy
# that's written in terms of IPv4 addresses.
xfrm_version = 4 if version == 5 else version
xfrm_family = net_test.GetAddressFamily(xfrm_version)
xfrm_base.ApplySocketPolicy(s, xfrm_family, xfrm.XFRM_POLICY_OUT,
TEST_SPI, reqid, None)
# Because the policy has level set to "require" (the default), attempting
# to send a packet results in an error, because there is no SA that
# matches the socket policy we set.
self.assertRaisesErrno(
EAGAIN,
s.sendto, net_test.UDP_PAYLOAD, (remotesockaddr, 53))
# If there is a user space key manager, calling sendto() after applying the socket policy
# creates an SA whose state is XFRM_STATE_ACQ. So this just deletes it.
# If there is no user space key manager, deleting SA returns ESRCH as the error code.
try:
self.xfrm.DeleteSaInfo(self.GetRemoteAddress(xfrm_version), TEST_SPI, IPPROTO_ESP)
except IOError as e:
self.assertEquals(ESRCH, e.errno, "Unexpected error when deleting ACQ SA")
# Adding a matching SA causes the packet to go out encrypted. The SA's
# SPI must match the one in our template, and the destination address must
# match the packet's destination address (in tunnel mode, it has to match
# the tunnel destination).
self.CreateNewSa(
net_test.GetWildcardAddress(xfrm_version),
self.GetRemoteAddress(xfrm_version), TEST_SPI, reqid, None)
s.sendto(net_test.UDP_PAYLOAD, (remotesockaddr, 53))
expected_length = xfrm_base.GetEspPacketLength(xfrm.XFRM_MODE_TRANSPORT,
version, False,
net_test.UDP_PAYLOAD,
xfrm_base._ALGO_HMAC_SHA1,
xfrm_base._ALGO_CBC_AES_256)
self._ExpectEspPacketOn(netid, TEST_SPI, 1, expected_length, None, None)
# Sending to another destination doesn't work: again, no matching SA.
remoteaddr2 = self.GetOtherRemoteSocketAddress(version)
self.assertRaisesErrno(
EAGAIN,
s.sendto, net_test.UDP_PAYLOAD, (remoteaddr2, 53))
# Sending on another socket without the policy applied results in an
# unencrypted packet going out.
s2 = socket(family, SOCK_DGRAM, 0)
self.SelectInterface(s2, netid, "mark")
s2.sendto(net_test.UDP_PAYLOAD, (remotesockaddr, 53))
pkts = self.ReadAllPacketsOn(netid)
self.assertEquals(1, len(pkts))
packet = pkts[0]
protocol = packet.nh if version == 6 else packet.proto
self.assertEquals(IPPROTO_UDP, protocol)
# Deleting the SA causes the first socket to return errors again.
self.xfrm.DeleteSaInfo(self.GetRemoteAddress(xfrm_version), TEST_SPI,
IPPROTO_ESP)
self.assertRaisesErrno(
EAGAIN,
s.sendto, net_test.UDP_PAYLOAD, (remotesockaddr, 53))
# Clear the socket policy and expect a cleartext packet.
xfrm_base.SetPolicySockopt(s, family, None)
s.sendto(net_test.UDP_PAYLOAD, (remotesockaddr, 53))
self.ExpectPacketOn(netid, "Send after clear, expected %s" % desc, pkt)
# Clearing the policy twice is safe.
xfrm_base.SetPolicySockopt(s, family, None)
s.sendto(net_test.UDP_PAYLOAD, (remotesockaddr, 53))
self.ExpectPacketOn(netid, "Send after clear 2, expected %s" % desc, pkt)
# Clearing if a policy was never set is safe.
s = socket(AF_INET6, SOCK_DGRAM, 0)
xfrm_base.SetPolicySockopt(s, family, None)
def testSocketPolicyIPv4(self):
self._TestSocketPolicy(4)
def testSocketPolicyIPv6(self):
self._TestSocketPolicy(6)
def testSocketPolicyMapped(self):
self._TestSocketPolicy(5)
# Sets up sockets and marks to correct netid
def _SetupUdpEncapSockets(self):
netid = self.RandomNetid()
myaddr = self.MyAddress(4, netid)
remoteaddr = self.GetRemoteAddress(4)
# Reserve a port on which to receive UDP encapsulated packets. Sending
# packets works without this (and potentially can send packets with a source
# port belonging to another application), but receiving requires the port to
# be bound and the encapsulation socket option enabled.
encap_sock = net_test.Socket(AF_INET, SOCK_DGRAM, 0)
encap_sock.bind((myaddr, 0))
encap_port = encap_sock.getsockname()[1]
encap_sock.setsockopt(IPPROTO_UDP, xfrm.UDP_ENCAP, xfrm.UDP_ENCAP_ESPINUDP)
# Open a socket to send traffic.
s = socket(AF_INET, SOCK_DGRAM, 0)
self.SelectInterface(s, netid, "mark")
s.connect((remoteaddr, 53))
return netid, myaddr, remoteaddr, encap_sock, encap_port, s
# Sets up SAs and applies socket policy to given socket
def _SetupUdpEncapSaPair(self, myaddr, remoteaddr, in_spi, out_spi,
encap_port, s, use_null_auth):
in_reqid = 123
out_reqid = 456
# Create inbound and outbound SAs that specify UDP encapsulation.
encaptmpl = xfrm.XfrmEncapTmpl((xfrm.UDP_ENCAP_ESPINUDP, htons(encap_port),
htons(4500), 16 * "\x00"))
self.CreateNewSa(myaddr, remoteaddr, out_spi, out_reqid, encaptmpl,
use_null_auth)
# Add an encap template that's the mirror of the outbound one.
encaptmpl.sport, encaptmpl.dport = encaptmpl.dport, encaptmpl.sport
self.CreateNewSa(remoteaddr, myaddr, in_spi, in_reqid, encaptmpl,
use_null_auth)
# Apply socket policies to s.
xfrm_base.ApplySocketPolicy(s, AF_INET, xfrm.XFRM_POLICY_OUT, out_spi,
out_reqid, None)
# TODO: why does this work without a per-socket policy applied?
# The received packet obviously matches an SA, but don't inbound packets
# need to match a policy as well? (b/71541609)
xfrm_base.ApplySocketPolicy(s, AF_INET, xfrm.XFRM_POLICY_IN, in_spi,
in_reqid, None)
# Uncomment for debugging.
# subprocess.call("ip xfrm state".split())
# Check that packets can be sent and received.
def _VerifyUdpEncapSocket(self, netid, remoteaddr, myaddr, encap_port, sock,
in_spi, out_spi, null_auth, seq_num):
# Now send a packet.
sock.sendto(net_test.UDP_PAYLOAD, (remoteaddr, 53))
srcport = sock.getsockname()[1]
# Expect to see an UDP encapsulated packet.
pkts = self.ReadAllPacketsOn(netid)
self.assertEquals(1, len(pkts))
packet = pkts[0]
auth_algo = (
xfrm_base._ALGO_AUTH_NULL if null_auth else xfrm_base._ALGO_HMAC_SHA1)
expected_len = xfrm_base.GetEspPacketLength(
xfrm.XFRM_MODE_TRANSPORT, 4, True, net_test.UDP_PAYLOAD, auth_algo,
xfrm_base._ALGO_CBC_AES_256)
self.assertIsUdpEncapEsp(packet, out_spi, seq_num, expected_len)
# Now test the receive path. Because we don't know how to decrypt packets,
# we just play back the encrypted packet that kernel sent earlier. We swap
# the addresses in the IP header to make the packet look like it's bound for
# us, but we can't do that for the port numbers because the UDP header is
# part of the integrity protected payload, which we can only replay as is.
# So the source and destination ports are swapped and the packet appears to
# be sent from srcport to port 53. Open another socket on that port, and
# apply the inbound policy to it.
twisted_socket = socket(AF_INET, SOCK_DGRAM, 0)
csocket.SetSocketTimeout(twisted_socket, 100)
twisted_socket.bind(("0.0.0.0", 53))
# Save the payload of the packet so we can replay it back to ourselves, and
# replace the SPI with our inbound SPI.
payload = str(packet.payload)[8:]
spi_seq = xfrm.EspHdr((in_spi, seq_num)).Pack()
payload = spi_seq + payload[len(spi_seq):]
sainfo = self.xfrm.FindSaInfo(in_spi)
start_integrity_failures = sainfo.stats.integrity_failed
# Now play back the valid packet and check that we receive it.
incoming = (scapy.IP(src=remoteaddr, dst=myaddr) /
scapy.UDP(sport=4500, dport=encap_port) / payload)
incoming = scapy.IP(str(incoming))
self.ReceivePacketOn(netid, incoming)
sainfo = self.xfrm.FindSaInfo(in_spi)
# TODO: break this out into a separate test
# If our SPIs are different, and we aren't using null authentication,
# we expect the packet to be dropped. We also expect that the integrity
# failure counter to increase, as SPIs are part of the authenticated or
# integrity-verified portion of the packet.
if not null_auth and in_spi != out_spi:
self.assertRaisesErrno(EAGAIN, twisted_socket.recv, 4096)
self.assertEquals(start_integrity_failures + 1,
sainfo.stats.integrity_failed)
else:
data, src = twisted_socket.recvfrom(4096)
self.assertEquals(net_test.UDP_PAYLOAD, data)
self.assertEquals((remoteaddr, srcport), src)
self.assertEquals(start_integrity_failures, sainfo.stats.integrity_failed)
# Check that unencrypted packets on twisted_socket are not received.
unencrypted = (
scapy.IP(src=remoteaddr, dst=myaddr) / scapy.UDP(
sport=srcport, dport=53) / net_test.UDP_PAYLOAD)
self.assertRaisesErrno(EAGAIN, twisted_socket.recv, 4096)
def _RunEncapSocketPolicyTest(self, in_spi, out_spi, use_null_auth):
netid, myaddr, remoteaddr, encap_sock, encap_port, s = \
self._SetupUdpEncapSockets()
self._SetupUdpEncapSaPair(myaddr, remoteaddr, in_spi, out_spi, encap_port,
s, use_null_auth)
# Check that UDP encap sockets work with socket policy and given SAs
self._VerifyUdpEncapSocket(netid, remoteaddr, myaddr, encap_port, s, in_spi,
out_spi, use_null_auth, 1)
# TODO: Add tests for ESP (non-encap) sockets.
def testUdpEncapSameSpisNullAuth(self):
# Use the same SPI both inbound and outbound because this lets us receive
# encrypted packets by simply replaying the packets the kernel sends
# without having to disable authentication
self._RunEncapSocketPolicyTest(TEST_SPI, TEST_SPI, True)
def testUdpEncapSameSpis(self):
self._RunEncapSocketPolicyTest(TEST_SPI, TEST_SPI, False)
def testUdpEncapDifferentSpisNullAuth(self):
self._RunEncapSocketPolicyTest(TEST_SPI, TEST_SPI2, True)
def testUdpEncapDifferentSpis(self):
self._RunEncapSocketPolicyTest(TEST_SPI, TEST_SPI2, False)
def testUdpEncapRekey(self):
# Select the two SPIs that will be used
start_spi = TEST_SPI
rekey_spi = TEST_SPI2
# Setup sockets
netid, myaddr, remoteaddr, encap_sock, encap_port, s = \
self._SetupUdpEncapSockets()
# The SAs must use null authentication, since we change SPIs on the fly
# Without null authentication, this would result in an ESP authentication
# error since the SPI is part of the authenticated section. The packet
# would then be dropped
self._SetupUdpEncapSaPair(myaddr, remoteaddr, start_spi, start_spi,
encap_port, s, True)
# Check that UDP encap sockets work with socket policy and given SAs
self._VerifyUdpEncapSocket(netid, remoteaddr, myaddr, encap_port, s,
start_spi, start_spi, True, 1)
# Rekey this socket using the make-before-break paradigm. First we create
# new SAs, update the per-socket policies, and only then remove the old SAs
#
# This allows us to switch to the new SA without breaking the outbound path.
self._SetupUdpEncapSaPair(myaddr, remoteaddr, rekey_spi, rekey_spi,
encap_port, s, True)
# Check that UDP encap socket works with updated socket policy, sending
# using new SA, but receiving on both old and new SAs
self._VerifyUdpEncapSocket(netid, remoteaddr, myaddr, encap_port, s,
rekey_spi, rekey_spi, True, 1)
self._VerifyUdpEncapSocket(netid, remoteaddr, myaddr, encap_port, s,
start_spi, rekey_spi, True, 2)
# Delete old SAs
self.xfrm.DeleteSaInfo(remoteaddr, start_spi, IPPROTO_ESP)
self.xfrm.DeleteSaInfo(myaddr, start_spi, IPPROTO_ESP)
# Check that UDP encap socket works with updated socket policy and new SAs
self._VerifyUdpEncapSocket(netid, remoteaddr, myaddr, encap_port, s,
rekey_spi, rekey_spi, True, 3)
def testAllocSpecificSpi(self):
spi = 0xABCD
new_sa = self.xfrm.AllocSpi("::", IPPROTO_ESP, spi, spi)
self.assertEquals(spi, new_sa.id.spi)
def testAllocSpecificSpiUnavailable(self):
"""Attempt to allocate the same SPI twice."""
spi = 0xABCD
new_sa = self.xfrm.AllocSpi("::", IPPROTO_ESP, spi, spi)
self.assertEquals(spi, new_sa.id.spi)
with self.assertRaisesErrno(ENOENT):
new_sa = self.xfrm.AllocSpi("::", IPPROTO_ESP, spi, spi)
def testAllocRangeSpi(self):
start, end = 0xABCD0, 0xABCDF
new_sa = self.xfrm.AllocSpi("::", IPPROTO_ESP, start, end)
spi = new_sa.id.spi
self.assertGreaterEqual(spi, start)
self.assertLessEqual(spi, end)
def testAllocRangeSpiUnavailable(self):
"""Attempt to allocate N+1 SPIs from a range of size N."""
start, end = 0xABCD0, 0xABCDF
range_size = end - start + 1
spis = set()
# Assert that allocating SPI fails when none are available.
with self.assertRaisesErrno(ENOENT):
# Allocating range_size + 1 SPIs is guaranteed to fail. Due to the way
# kernel picks random SPIs, this has a high probability of failing before
# reaching that limit.
for i in xrange(range_size + 1):
new_sa = self.xfrm.AllocSpi("::", IPPROTO_ESP, start, end)
spi = new_sa.id.spi
self.assertNotIn(spi, spis)
spis.add(spi)
def testSocketPolicyDstCacheV6(self):
self._TestSocketPolicyDstCache(6)
def testSocketPolicyDstCacheV4(self):
self._TestSocketPolicyDstCache(4)
def _TestSocketPolicyDstCache(self, version):
"""Test that destination cache is cleared with socket policy.
This relies on the fact that connect() on a UDP socket populates the
destination cache.
"""
# Create UDP socket.
family = net_test.GetAddressFamily(version)
netid = self.RandomNetid()
s = socket(family, SOCK_DGRAM, 0)
self.SelectInterface(s, netid, "mark")
# Populate the socket's destination cache.
remote = self.GetRemoteAddress(version)
s.connect((remote, 53))
# Apply a policy to the socket. Should clear dst cache.
reqid = 123
xfrm_base.ApplySocketPolicy(s, family, xfrm.XFRM_POLICY_OUT,
TEST_SPI, reqid, None)
# Policy with no matching SA should result in EAGAIN. If destination cache
# failed to clear, then the UDP packet will be sent normally.
with self.assertRaisesErrno(EAGAIN):
s.send(net_test.UDP_PAYLOAD)
self.ExpectNoPacketsOn(netid, "Packet not blocked by policy")
def _CheckNullEncryptionTunnelMode(self, version):
family = net_test.GetAddressFamily(version)
netid = self.RandomNetid()
local_addr = self.MyAddress(version, netid)
remote_addr = self.GetRemoteAddress(version)
# Borrow the address of another netId as the source address of the tunnel
tun_local = self.MyAddress(version, self.RandomNetid(netid))
# For generality, pick a tunnel endpoint that's not the address we
# connect the socket to.
tun_remote = TUNNEL_ENDPOINTS[version]
# Output
self.xfrm.AddSaInfo(
tun_local, tun_remote, 0xABCD, xfrm.XFRM_MODE_TUNNEL, 123,
xfrm_base._ALGO_CRYPT_NULL, xfrm_base._ALGO_AUTH_NULL,
None, None, None, netid)
# Input
self.xfrm.AddSaInfo(
tun_remote, tun_local, 0x9876, xfrm.XFRM_MODE_TUNNEL, 456,
xfrm_base._ALGO_CRYPT_NULL, xfrm_base._ALGO_AUTH_NULL,
None, None, None, None)
sock = net_test.UDPSocket(family)
self.SelectInterface(sock, netid, "mark")
sock.bind((local_addr, 0))
local_port = sock.getsockname()[1]
remote_port = 5555
xfrm_base.ApplySocketPolicy(
sock, family, xfrm.XFRM_POLICY_OUT, 0xABCD, 123,
(tun_local, tun_remote))
xfrm_base.ApplySocketPolicy(
sock, family, xfrm.XFRM_POLICY_IN, 0x9876, 456,
(tun_remote, tun_local))
# Create and receive an ESP packet.
IpType = {4: scapy.IP, 6: scapy.IPv6}[version]
input_pkt = (IpType(src=remote_addr, dst=local_addr) /
scapy.UDP(sport=remote_port, dport=local_port) /
"input hello")
input_pkt = IpType(str(input_pkt)) # Compute length, checksum.
input_pkt = xfrm_base.EncryptPacketWithNull(input_pkt, 0x9876,
1, (tun_remote, tun_local))
self.ReceivePacketOn(netid, input_pkt)
msg, addr = sock.recvfrom(1024)
self.assertEquals("input hello", msg)
self.assertEquals((remote_addr, remote_port), addr[:2])
# Send and capture a packet.
sock.sendto("output hello", (remote_addr, remote_port))
packets = self.ReadAllPacketsOn(netid)
self.assertEquals(1, len(packets))
output_pkt = packets[0]
output_pkt, esp_hdr = xfrm_base.DecryptPacketWithNull(output_pkt)
self.assertEquals(output_pkt[scapy.UDP].len, len("output_hello") + 8)
self.assertEquals(remote_addr, output_pkt.dst)
self.assertEquals(remote_port, output_pkt[scapy.UDP].dport)
# length of the payload plus the UDP header
self.assertEquals("output hello", str(output_pkt[scapy.UDP].payload))
self.assertEquals(0xABCD, esp_hdr.spi)
def testNullEncryptionTunnelMode(self):
"""Verify null encryption in tunnel mode.
This test verifies both manual assembly and disassembly of UDP packets
with ESP in IPsec tunnel mode.
"""
for version in [4, 6]:
self._CheckNullEncryptionTunnelMode(version)
def _CheckNullEncryptionTransportMode(self, version):
family = net_test.GetAddressFamily(version)
netid = self.RandomNetid()
local_addr = self.MyAddress(version, netid)
remote_addr = self.GetRemoteAddress(version)
# Output
self.xfrm.AddSaInfo(
local_addr, remote_addr, 0xABCD, xfrm.XFRM_MODE_TRANSPORT, 123,
xfrm_base._ALGO_CRYPT_NULL, xfrm_base._ALGO_AUTH_NULL,
None, None, None, None)
# Input
self.xfrm.AddSaInfo(
remote_addr, local_addr, 0x9876, xfrm.XFRM_MODE_TRANSPORT, 456,
xfrm_base._ALGO_CRYPT_NULL, xfrm_base._ALGO_AUTH_NULL,
None, None, None, None)
sock = net_test.UDPSocket(family)
self.SelectInterface(sock, netid, "mark")
sock.bind((local_addr, 0))
local_port = sock.getsockname()[1]
remote_port = 5555
xfrm_base.ApplySocketPolicy(
sock, family, xfrm.XFRM_POLICY_OUT, 0xABCD, 123, None)
xfrm_base.ApplySocketPolicy(
sock, family, xfrm.XFRM_POLICY_IN, 0x9876, 456, None)
# Create and receive an ESP packet.
IpType = {4: scapy.IP, 6: scapy.IPv6}[version]
input_pkt = (IpType(src=remote_addr, dst=local_addr) /
scapy.UDP(sport=remote_port, dport=local_port) /
"input hello")
input_pkt = IpType(str(input_pkt)) # Compute length, checksum.
input_pkt = xfrm_base.EncryptPacketWithNull(input_pkt, 0x9876, 1, None)
self.ReceivePacketOn(netid, input_pkt)
msg, addr = sock.recvfrom(1024)
self.assertEquals("input hello", msg)
self.assertEquals((remote_addr, remote_port), addr[:2])
# Send and capture a packet.
sock.sendto("output hello", (remote_addr, remote_port))
packets = self.ReadAllPacketsOn(netid)
self.assertEquals(1, len(packets))
output_pkt = packets[0]
output_pkt, esp_hdr = xfrm_base.DecryptPacketWithNull(output_pkt)
# length of the payload plus the UDP header
self.assertEquals(output_pkt[scapy.UDP].len, len("output_hello") + 8)
self.assertEquals(remote_addr, output_pkt.dst)
self.assertEquals(remote_port, output_pkt[scapy.UDP].dport)
self.assertEquals("output hello", str(output_pkt[scapy.UDP].payload))
self.assertEquals(0xABCD, esp_hdr.spi)
def testNullEncryptionTransportMode(self):
"""Verify null encryption in transport mode.
This test verifies both manual assembly and disassembly of UDP packets
with ESP in IPsec transport mode.
"""
for version in [4, 6]:
self._CheckNullEncryptionTransportMode(version)
def _CheckGlobalPoliciesByMark(self, version):
"""Tests that global policies may differ by only the mark."""
family = net_test.GetAddressFamily(version)
sel = xfrm.EmptySelector(family)
# Pick 2 arbitrary mark values.
mark1 = xfrm.XfrmMark(mark=0xf00, mask=xfrm_base.MARK_MASK_ALL)
mark2 = xfrm.XfrmMark(mark=0xf00d, mask=xfrm_base.MARK_MASK_ALL)
# Create a global policy.
policy = xfrm.UserPolicy(xfrm.XFRM_POLICY_OUT, sel)
tmpl = xfrm.UserTemplate(AF_UNSPEC, 0xfeed, 0, None)
# Create the policy with the first mark.
self.xfrm.AddPolicyInfo(policy, tmpl, mark1)
# Create the same policy but with the second (different) mark.
self.xfrm.AddPolicyInfo(policy, tmpl, mark2)
# Delete the policies individually
self.xfrm.DeletePolicyInfo(sel, xfrm.XFRM_POLICY_OUT, mark1)
self.xfrm.DeletePolicyInfo(sel, xfrm.XFRM_POLICY_OUT, mark2)
def testGlobalPoliciesByMarkV4(self):
self._CheckGlobalPoliciesByMark(4)
def testGlobalPoliciesByMarkV6(self):
self._CheckGlobalPoliciesByMark(6)
def _CheckUpdatePolicy(self, version):
"""Tests that we can can update the template on a policy."""
family = net_test.GetAddressFamily(version)
tmpl1 = xfrm.UserTemplate(family, 0xdead, 0, None)
tmpl2 = xfrm.UserTemplate(family, 0xbeef, 0, None)
sel = xfrm.EmptySelector(family)
policy = xfrm.UserPolicy(xfrm.XFRM_POLICY_OUT, sel)
mark = xfrm.XfrmMark(mark=0xf00, mask=xfrm_base.MARK_MASK_ALL)
def _CheckTemplateMatch(tmpl):
"""Dump the SPD and match a single template on a single policy."""
dump = self.xfrm.DumpPolicyInfo()
self.assertEquals(1, len(dump))
_, attributes = dump[0]
self.assertEquals(attributes['XFRMA_TMPL'], tmpl)
# Create a new policy using update.
self.xfrm.UpdatePolicyInfo(policy, tmpl1, mark, None)
# NEWPOLICY will not update the existing policy. This checks both that
# UPDPOLICY created a policy and that NEWPOLICY will not perform updates.
_CheckTemplateMatch(tmpl1)
with self.assertRaisesErrno(EEXIST):
self.xfrm.AddPolicyInfo(policy, tmpl2, mark, None)
# Update the policy using UPDPOLICY.
self.xfrm.UpdatePolicyInfo(policy, tmpl2, mark, None)
# There should only be one policy after update, and it should have the
# updated template.
_CheckTemplateMatch(tmpl2)
def testUpdatePolicyV4(self):
self._CheckUpdatePolicy(4)
def testUpdatePolicyV6(self):
self._CheckUpdatePolicy(6)
def _CheckPolicyDifferByDirection(self,version):
"""Tests that policies can differ only by direction."""
family = net_test.GetAddressFamily(version)
tmpl = xfrm.UserTemplate(family, 0xdead, 0, None)
sel = xfrm.EmptySelector(family)
mark = xfrm.XfrmMark(mark=0xf00, mask=xfrm_base.MARK_MASK_ALL)
policy = xfrm.UserPolicy(xfrm.XFRM_POLICY_OUT, sel)
self.xfrm.AddPolicyInfo(policy, tmpl, mark)
policy = xfrm.UserPolicy(xfrm.XFRM_POLICY_IN, sel)
self.xfrm.AddPolicyInfo(policy, tmpl, mark)
def testPolicyDifferByDirectionV4(self):
self._CheckPolicyDifferByDirection(4)
def testPolicyDifferByDirectionV6(self):
self._CheckPolicyDifferByDirection(6)
class XfrmOutputMarkTest(xfrm_base.XfrmLazyTest):
def _CheckTunnelModeOutputMark(self, version, tunsrc, mark, expected_netid):
"""Tests sending UDP packets to tunnel mode SAs with output marks.
Opens a UDP socket and binds it to a random netid, then sets up tunnel mode
SAs with an output_mark of mark and sets a socket policy to use the SA.
Then checks that sending on those SAs sends a packet on expected_netid,
or, if expected_netid is zero, checks that sending returns ENETUNREACH.
Args:
version: 4 or 6.
tunsrc: A string, the source address of the tunnel.
mark: An integer, the output_mark to set in the SA.
expected_netid: An integer, the netid to expect the kernel to send the
packet on. If None, expect that sendto will fail with ENETUNREACH.
"""
# Open a UDP socket and bind it to a random netid.
family = net_test.GetAddressFamily(version)
s = socket(family, SOCK_DGRAM, 0)
self.SelectInterface(s, self.RandomNetid(), "mark")
# For generality, pick a tunnel endpoint that's not the address we
# connect the socket to.
tundst = TUNNEL_ENDPOINTS[version]
tun_addrs = (tunsrc, tundst)
# Create a tunnel mode SA and use XFRM_OUTPUT_MARK to bind it to netid.
spi = TEST_SPI * mark
reqid = 100 + spi
self.xfrm.AddSaInfo(tunsrc, tundst, spi, xfrm.XFRM_MODE_TUNNEL, reqid,
xfrm_base._ALGO_CBC_AES_256, xfrm_base._ALGO_HMAC_SHA1,
None, None, None, mark)
# Set a socket policy to use it.
xfrm_base.ApplySocketPolicy(s, family, xfrm.XFRM_POLICY_OUT, spi, reqid,
tun_addrs)
# Send a packet and check that we see it on the wire.
remoteaddr = self.GetRemoteAddress(version)
packetlen = xfrm_base.GetEspPacketLength(xfrm.XFRM_MODE_TUNNEL, version,
False, net_test.UDP_PAYLOAD,
xfrm_base._ALGO_HMAC_SHA1,
xfrm_base._ALGO_CBC_AES_256)
if expected_netid is not None:
s.sendto(net_test.UDP_PAYLOAD, (remoteaddr, 53))
self._ExpectEspPacketOn(expected_netid, spi, 1, packetlen, tunsrc, tundst)
else:
with self.assertRaisesErrno(ENETUNREACH):
s.sendto(net_test.UDP_PAYLOAD, (remoteaddr, 53))
def testTunnelModeOutputMarkIPv4(self):
for netid in self.NETIDS:
tunsrc = self.MyAddress(4, netid)
self._CheckTunnelModeOutputMark(4, tunsrc, netid, netid)
def testTunnelModeOutputMarkIPv6(self):
for netid in self.NETIDS:
tunsrc = self.MyAddress(6, netid)
self._CheckTunnelModeOutputMark(6, tunsrc, netid, netid)
def testTunnelModeOutputNoMarkIPv4(self):
tunsrc = self.MyAddress(4, self.RandomNetid())
self._CheckTunnelModeOutputMark(4, tunsrc, 0, None)
def testTunnelModeOutputNoMarkIPv6(self):
tunsrc = self.MyAddress(6, self.RandomNetid())
self._CheckTunnelModeOutputMark(6, tunsrc, 0, None)
def testTunnelModeOutputInvalidMarkIPv4(self):
tunsrc = self.MyAddress(4, self.RandomNetid())
self._CheckTunnelModeOutputMark(4, tunsrc, 9999, None)
def testTunnelModeOutputInvalidMarkIPv6(self):
tunsrc = self.MyAddress(6, self.RandomNetid())
self._CheckTunnelModeOutputMark(6, tunsrc, 9999, None)
def testTunnelModeOutputMarkAttributes(self):
mark = 1234567
self.xfrm.AddSaInfo(TEST_ADDR1, TUNNEL_ENDPOINTS[6], 0x1234,
xfrm.XFRM_MODE_TUNNEL, 100, xfrm_base._ALGO_CBC_AES_256,
xfrm_base._ALGO_HMAC_SHA1, None, None, None, mark)
dump = self.xfrm.DumpSaInfo()
self.assertEquals(1, len(dump))
sainfo, attributes = dump[0]
self.assertEquals(mark, attributes["XFRMA_OUTPUT_MARK"])
def testInvalidAlgorithms(self):
key = "af442892cdcd0ef650e9c299f9a8436a".decode("hex")
invalid_auth = (xfrm.XfrmAlgoAuth(("invalid(algo)", 128, 96)), key)
invalid_crypt = (xfrm.XfrmAlgo(("invalid(algo)", 128)), key)
with self.assertRaisesErrno(ENOSYS):
self.xfrm.AddSaInfo(TEST_ADDR1, TEST_ADDR2, 0x1234,
xfrm.XFRM_MODE_TRANSPORT, 0, xfrm_base._ALGO_CBC_AES_256,
invalid_auth, None, None, None, 0)
with self.assertRaisesErrno(ENOSYS):
self.xfrm.AddSaInfo(TEST_ADDR1, TEST_ADDR2, 0x1234,
xfrm.XFRM_MODE_TRANSPORT, 0, invalid_crypt,
xfrm_base._ALGO_HMAC_SHA1, None, None, None, 0)
def testUpdateSaAddMark(self):
"""Test that when an SA has no mark, it can be updated to add a mark."""
for version in [4, 6]:
spi = 0xABCD
# Test that an SA created with ALLOCSPI can be updated with the mark.
new_sa = self.xfrm.AllocSpi(net_test.GetWildcardAddress(version),
IPPROTO_ESP, spi, spi)
mark = xfrm.ExactMatchMark(0xf00d)
self.xfrm.AddSaInfo(net_test.GetWildcardAddress(version),
net_test.GetWildcardAddress(version),
spi, xfrm.XFRM_MODE_TUNNEL, 0,
xfrm_base._ALGO_CBC_AES_256,
xfrm_base._ALGO_HMAC_SHA1,
None, None, mark, 0, is_update=True)
dump = self.xfrm.DumpSaInfo()
self.assertEquals(1, len(dump)) # check that update updated
sainfo, attributes = dump[0]
self.assertEquals(mark, attributes["XFRMA_MARK"])
self.xfrm.DeleteSaInfo(net_test.GetWildcardAddress(version),
spi, IPPROTO_ESP, mark)
# TODO: we might also need to update the mark for a VALID SA.
if __name__ == "__main__":
unittest.main()