Move packet definitions to their own file.
Change-Id: I68c57311915e2a8dbdb6842dfb87367485fcf819
diff --git a/net/test/multinetwork_test.py b/net/test/multinetwork_test.py
index 34ee365..a31c5f0 100755
--- a/net/test/multinetwork_test.py
+++ b/net/test/multinetwork_test.py
@@ -27,11 +27,7 @@
import iproute
import multinetwork_base
import net_test
-
-PING_IDENT = 0xff19
-PING_PAYLOAD = "foobarbaz"
-PING_SEQ = 3
-PING_TOS = 0x83
+import packets
# For brevity.
UDP_PAYLOAD = net_test.UDP_PAYLOAD
@@ -51,189 +47,6 @@
pass
-class Packets(object):
-
- TCP_FIN = 1
- TCP_SYN = 2
- TCP_RST = 4
- TCP_PSH = 8
- TCP_ACK = 16
-
- TCP_SEQ = 1692871236
- TCP_WINDOW = 14400
-
- @staticmethod
- def RandomPort():
- return random.randint(1025, 65535)
-
- @staticmethod
- def _GetIpLayer(version):
- return {4: scapy.IP, 6: scapy.IPv6}[version]
-
- @staticmethod
- def _SetPacketTos(packet, tos):
- if isinstance(packet, scapy.IPv6):
- packet.tc = tos
- elif isinstance(packet, scapy.IP):
- packet.tos = tos
- else:
- raise ValueError("Can't find ToS Field")
-
- @classmethod
- def UDP(cls, version, srcaddr, dstaddr, sport=0):
- ip = cls._GetIpLayer(version)
- # Can't just use "if sport" because None has meaning (it means unspecified).
- if sport == 0:
- sport = cls.RandomPort()
- return ("UDPv%d packet" % version,
- ip(src=srcaddr, dst=dstaddr) /
- scapy.UDP(sport=sport, dport=53) / UDP_PAYLOAD)
-
- @classmethod
- def UDPWithOptions(cls, version, srcaddr, dstaddr, sport=0):
- if version == 4:
- packet = (scapy.IP(src=srcaddr, dst=dstaddr, ttl=39, tos=0x83) /
- scapy.UDP(sport=sport, dport=53) /
- UDP_PAYLOAD)
- else:
- packet = (scapy.IPv6(src=srcaddr, dst=dstaddr,
- fl=0xbeef, hlim=39, tc=0x83) /
- scapy.UDP(sport=sport, dport=53) /
- UDP_PAYLOAD)
- return ("UDPv%d packet with options" % version, packet)
-
- @classmethod
- def SYN(cls, dport, version, srcaddr, dstaddr, sport=0, seq=TCP_SEQ):
- ip = cls._GetIpLayer(version)
- if sport == 0:
- sport = cls.RandomPort()
- return ("TCP SYN",
- ip(src=srcaddr, dst=dstaddr) /
- scapy.TCP(sport=sport, dport=dport,
- seq=seq, ack=0,
- flags=cls.TCP_SYN, window=cls.TCP_WINDOW))
-
- @classmethod
- def RST(cls, version, srcaddr, dstaddr, packet):
- ip = cls._GetIpLayer(version)
- original = packet.getlayer("TCP")
- return ("TCP RST",
- ip(src=srcaddr, dst=dstaddr) /
- scapy.TCP(sport=original.dport, dport=original.sport,
- ack=original.seq + 1, seq=None,
- flags=cls.TCP_RST | cls.TCP_ACK, window=cls.TCP_WINDOW))
-
- @classmethod
- def SYNACK(cls, version, srcaddr, dstaddr, packet):
- ip = cls._GetIpLayer(version)
- original = packet.getlayer("TCP")
- return ("TCP SYN+ACK",
- ip(src=srcaddr, dst=dstaddr) /
- scapy.TCP(sport=original.dport, dport=original.sport,
- ack=original.seq + 1, seq=None,
- flags=cls.TCP_SYN | cls.TCP_ACK, window=None))
-
- @classmethod
- def ACK(cls, version, srcaddr, dstaddr, packet, payload=""):
- ip = cls._GetIpLayer(version)
- original = packet.getlayer("TCP")
- was_syn_or_fin = (original.flags & (cls.TCP_SYN | cls.TCP_FIN)) != 0
- ack_delta = was_syn_or_fin + len(original.payload)
- desc = "TCP data" if payload else "TCP ACK"
- flags = cls.TCP_ACK | cls.TCP_PSH if payload else cls.TCP_ACK
- return (desc,
- ip(src=srcaddr, dst=dstaddr) /
- scapy.TCP(sport=original.dport, dport=original.sport,
- ack=original.seq + ack_delta, seq=original.ack,
- flags=flags, window=cls.TCP_WINDOW) /
- payload)
-
- @classmethod
- def FIN(cls, version, srcaddr, dstaddr, packet):
- ip = cls._GetIpLayer(version)
- original = packet.getlayer("TCP")
- was_fin = (original.flags & cls.TCP_FIN) != 0
- return ("TCP FIN",
- ip(src=srcaddr, dst=dstaddr) /
- scapy.TCP(sport=original.dport, dport=original.sport,
- ack=original.seq + was_fin, seq=original.ack,
- flags=cls.TCP_ACK | cls.TCP_FIN, window=cls.TCP_WINDOW))
-
- @classmethod
- def GRE(cls, version, srcaddr, dstaddr, proto, packet):
- if version == 4:
- ip = scapy.IP(src=srcaddr, dst=dstaddr, proto=net_test.IPPROTO_GRE)
- else:
- ip = scapy.IPv6(src=srcaddr, dst=dstaddr, nh=net_test.IPPROTO_GRE)
- packet = ip / scapy.GRE(proto=proto) / packet
- return ("GRE packet", packet)
-
- @classmethod
- def ICMPPortUnreachable(cls, version, srcaddr, dstaddr, packet):
- if version == 4:
- # Linux hardcodes the ToS on ICMP errors to 0xc0 or greater because of
- # RFC 1812 4.3.2.5 (!).
- return ("ICMPv4 port unreachable",
- scapy.IP(src=srcaddr, dst=dstaddr, proto=1, tos=0xc0) /
- scapy.ICMPerror(type=3, code=3) / packet)
- else:
- return ("ICMPv6 port unreachable",
- scapy.IPv6(src=srcaddr, dst=dstaddr) /
- scapy.ICMPv6DestUnreach(code=4) / packet)
-
- @classmethod
- def ICMPPacketTooBig(cls, version, srcaddr, dstaddr, packet):
- if version == 4:
- return ("ICMPv4 fragmentation needed",
- scapy.IP(src=srcaddr, dst=dstaddr, proto=1) /
- scapy.ICMPerror(type=3, code=4, unused=1280) / str(packet)[:64])
- else:
- udp = packet.getlayer("UDP")
- udp.payload = str(udp.payload)[:1280-40-8]
- return ("ICMPv6 Packet Too Big",
- scapy.IPv6(src=srcaddr, dst=dstaddr) /
- scapy.ICMPv6PacketTooBig() / str(packet)[:1232])
-
- @classmethod
- def ICMPEcho(cls, version, srcaddr, dstaddr):
- ip = cls._GetIpLayer(version)
- icmp = {4: scapy.ICMP, 6: scapy.ICMPv6EchoRequest}[version]
- packet = (ip(src=srcaddr, dst=dstaddr) /
- icmp(id=PING_IDENT, seq=PING_SEQ) / PING_PAYLOAD)
- cls._SetPacketTos(packet, PING_TOS)
- return ("ICMPv%d echo" % version, packet)
-
- @classmethod
- def ICMPReply(cls, version, srcaddr, dstaddr, packet):
- ip = cls._GetIpLayer(version)
- # Scapy doesn't provide an ICMP echo reply constructor.
- icmpv4_reply = lambda **kwargs: scapy.ICMP(type=0, **kwargs)
- icmp = {4: icmpv4_reply, 6: scapy.ICMPv6EchoReply}[version]
- packet = (ip(src=srcaddr, dst=dstaddr) /
- icmp(id=PING_IDENT, seq=PING_SEQ) / PING_PAYLOAD)
- # IPv6 only started copying the tclass to echo replies in 3.14.
- if version == 4 or net_test.LINUX_VERSION >= (3, 14):
- cls._SetPacketTos(packet, PING_TOS)
- return ("ICMPv%d echo reply" % version, packet)
-
- @classmethod
- def NS(cls, srcaddr, tgtaddr, srcmac):
- solicited = inet_pton(AF_INET6, tgtaddr)
- last3bytes = tuple([ord(b) for b in solicited[-3:]])
- solicited = "ff02::1:ff%02x:%02x%02x" % last3bytes
- packet = (scapy.IPv6(src=srcaddr, dst=solicited) /
- scapy.ICMPv6ND_NS(tgt=tgtaddr) /
- scapy.ICMPv6NDOptSrcLLAddr(lladdr=srcmac))
- return ("ICMPv6 NS", packet)
-
- @classmethod
- def NA(cls, srcaddr, dstaddr, srcmac):
- packet = (scapy.IPv6(src=srcaddr, dst=dstaddr) /
- scapy.ICMPv6ND_NA(tgt=srcaddr, R=0, S=1, O=1) /
- scapy.ICMPv6NDOptDstLLAddr(lladdr=srcmac))
- return ("ICMPv6 NA", packet)
-
-
class InboundMarkingTest(multinetwork_base.MultiNetworkBaseTest):
@classmethod
@@ -283,14 +96,14 @@
myaddr = self.MyAddress(version, netid)
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
- s.bind((myaddr, PING_IDENT))
- net_test.SetSocketTos(s, PING_TOS)
+ s.bind((myaddr, packets.PING_IDENT))
+ net_test.SetSocketTos(s, packets.PING_TOS)
- desc, expected = Packets.ICMPEcho(version, myaddr, dstaddr)
+ desc, expected = packets.ICMPEcho(version, myaddr, dstaddr)
msg = "IPv%d ping: expected %s on %s" % (
version, desc, self.GetInterfaceName(netid))
- s.sendto(packet + PING_PAYLOAD, (dstaddr, 19321))
+ s.sendto(packet + packets.PING_PAYLOAD, (dstaddr, 19321))
self.ExpectPacketOn(netid, msg, expected)
@@ -300,7 +113,7 @@
if version == 6 and dstaddr.startswith("::ffff"):
version = 4
myaddr = self.MyAddress(version, netid)
- desc, expected = Packets.SYN(53, version, myaddr, dstaddr,
+ desc, expected = packets.SYN(53, version, myaddr, dstaddr,
sport=None, seq=None)
# Non-blocking TCP connects always return EINPROGRESS.
@@ -316,7 +129,7 @@
if version == 6 and dstaddr.startswith("::ffff"):
version = 4
myaddr = self.MyAddress(version, netid)
- desc, expected = Packets.UDP(version, myaddr, dstaddr, sport=None)
+ desc, expected = packets.UDP(version, myaddr, dstaddr, sport=None)
msg = "IPv%s UDP %%s: expected %s on %s" % (
version, desc, self.GetInterfaceName(netid))
@@ -336,7 +149,7 @@
inner_version = {4: 6, 6: 4}[version]
inner_src = self.MyAddress(inner_version, netid)
inner_dst = self.GetRemoteAddress(inner_version)
- inner = str(Packets.UDP(inner_version, inner_src, inner_dst, sport=None)[1])
+ inner = str(packets.UDP(inner_version, inner_src, inner_dst, sport=None)[1])
ethertype = {4: net_test.ETH_P_IP, 6: net_test.ETH_P_IPV6}[inner_version]
# A GRE header can be as simple as two zero bytes and the ethertype.
@@ -344,7 +157,7 @@
myaddr = self.MyAddress(version, netid)
s.sendto(packet, (dstaddr, IPPROTO_GRE))
- desc, expected = Packets.GRE(version, myaddr, dstaddr, ethertype, inner)
+ desc, expected = packets.GRE(version, myaddr, dstaddr, ethertype, inner)
msg = "Raw IPv%d GRE with inner IPv%d UDP: expected %s on %s" % (
version, inner_version, desc, self.GetInterfaceName(netid))
self.ExpectPacketOn(netid, msg, expected)
@@ -411,10 +224,10 @@
# Figure out what packets to expect.
unspec = {4: "0.0.0.0", 6: "::"}[version]
- sport = Packets.RandomPort()
+ sport = packets.RandomPort()
s.bind((unspec, sport))
dstaddr = {4: self.IPV4_ADDR, 6: self.IPV6_ADDR}[version]
- desc, expected = Packets.UDP(version, unspec, dstaddr, sport)
+ desc, expected = packets.UDP(version, unspec, dstaddr, sport)
# If we're testing connected sockets, connect the socket on the first
# netid now.
@@ -517,7 +330,7 @@
sport = s.getsockname()[1]
srcaddr = self.MyAddress(version, netid)
- desc, expected = Packets.UDPWithOptions(version, srcaddr, dstaddr,
+ desc, expected = packets.UDPWithOptions(version, srcaddr, dstaddr,
sport=sport)
msg = "IPv%d UDP using pktinfo routing: expected %s on %s" % (
@@ -572,25 +385,25 @@
self._ReceiveAndExpectResponse(netid, packet, reply, msg)
def SYNToClosedPort(self, *args):
- return Packets.SYN(999, *args)
+ return packets.SYN(999, *args)
def testIPv4ICMPErrorsReflectMark(self):
- self.CheckReflection(4, Packets.UDP, Packets.ICMPPortUnreachable)
+ self.CheckReflection(4, packets.UDP, packets.ICMPPortUnreachable)
def testIPv6ICMPErrorsReflectMark(self):
- self.CheckReflection(6, Packets.UDP, Packets.ICMPPortUnreachable)
+ self.CheckReflection(6, packets.UDP, packets.ICMPPortUnreachable)
def testIPv4PingRepliesReflectMarkAndTos(self):
- self.CheckReflection(4, Packets.ICMPEcho, Packets.ICMPReply)
+ self.CheckReflection(4, packets.ICMPEcho, packets.ICMPReply)
def testIPv6PingRepliesReflectMarkAndTos(self):
- self.CheckReflection(6, Packets.ICMPEcho, Packets.ICMPReply)
+ self.CheckReflection(6, packets.ICMPEcho, packets.ICMPReply)
def testIPv4RSTsReflectMark(self):
- self.CheckReflection(4, self.SYNToClosedPort, Packets.RST)
+ self.CheckReflection(4, self.SYNToClosedPort, packets.RST)
def testIPv6RSTsReflectMark(self):
- self.CheckReflection(6, self.SYNToClosedPort, Packets.RST)
+ self.CheckReflection(6, self.SYNToClosedPort, packets.RST)
class TCPAcceptTest(InboundMarkingTest):
@@ -633,7 +446,7 @@
def CheckTCPConnection(self, mode, listensocket, netid, version,
myaddr, remoteaddr, packet, reply, msg):
- establishing_ack = Packets.ACK(version, remoteaddr, myaddr, reply)[1]
+ establishing_ack = packets.ACK(version, remoteaddr, myaddr, reply)[1]
# Attempt to confuse the kernel.
self.BounceSocket(listensocket)
@@ -649,14 +462,14 @@
try:
# Check that data sent on the connection goes out on the right interface.
- desc, data = Packets.ACK(version, myaddr, remoteaddr, establishing_ack,
+ desc, data = packets.ACK(version, myaddr, remoteaddr, establishing_ack,
payload=UDP_PAYLOAD)
s.send(UDP_PAYLOAD)
self.ExpectPacketOn(netid, msg + ": expecting %s" % desc, data)
self.BounceSocket(s)
# Keep up our end of the conversation.
- ack = Packets.ACK(version, remoteaddr, myaddr, data)[1]
+ ack = packets.ACK(version, remoteaddr, myaddr, data)[1]
self.BounceSocket(listensocket)
self.ReceivePacketOn(netid, ack)
@@ -677,16 +490,16 @@
# likely working, but a) extra tests are always good and b) extra packets
# like the FIN (and retransmitted FINs) could cause later tests that expect
# no packets to fail.
- desc, fin = Packets.FIN(version, myaddr, remoteaddr, ack)
+ desc, fin = packets.FIN(version, myaddr, remoteaddr, ack)
self.ExpectPacketOn(netid, msg + ": expecting %s after close" % desc, fin)
- desc, finack = Packets.FIN(version, remoteaddr, myaddr, fin)
+ desc, finack = packets.FIN(version, remoteaddr, myaddr, fin)
self.ReceivePacketOn(netid, finack)
# Since we called close() earlier, the userspace socket object is gone, so
# the socket has no UID. If we're doing UID routing, the ack might be routed
# incorrectly. Not much we can do here.
- desc, finackack = Packets.ACK(version, myaddr, remoteaddr, finack)
+ desc, finackack = packets.ACK(version, myaddr, remoteaddr, finack)
if mode != self.MODE_UID:
self.ExpectPacketOn(netid, msg + ": expecting final ack", finackack)
else:
@@ -723,10 +536,10 @@
# subsequent TCP connections use different source ports and
# retransmissions from old connections don't confuse subsequent
# tests.
- desc, packet = Packets.SYN(listenport, version, remoteaddr, myaddr)
+ desc, packet = packets.SYN(listenport, version, remoteaddr, myaddr)
if mode:
- reply_desc, reply = Packets.SYNACK(version, myaddr, remoteaddr,
+ reply_desc, reply = packets.SYNACK(version, myaddr, remoteaddr,
packet)
else:
reply_desc, reply = None, None
@@ -806,14 +619,14 @@
# Expect an NS for that destination on the interface.
myaddr = self.MyAddress(6, netid)
mymac = self.MyMacAddress(netid)
- desc, expected = Packets.NS(myaddr, dstaddr, mymac)
+ desc, expected = packets.NS(myaddr, dstaddr, mymac)
msg = "Sending UDP packet to on-link destination: expecting %s" % desc
time.sleep(0.0001) # Required to make the test work on kernel 3.1(!)
self.ExpectPacketOn(netid, msg, expected)
# Send an NA.
tgtmac = "02:00:00:00:%02x:99" % netid
- _, reply = Packets.NA(dstaddr, myaddr, tgtmac)
+ _, reply = packets.NA(dstaddr, myaddr, tgtmac)
# Don't use ReceivePacketOn, since that uses the router's MAC address as
# the source. Instead, construct our own Ethernet header with source
# MAC of tgtmac.
@@ -823,7 +636,7 @@
# Expect the kernel to send the original UDP packet now that the ND cache
# entry has been populated.
sport = s.getsockname()[1]
- desc, expected = Packets.UDP(6, myaddr, dstaddr, sport=sport)
+ desc, expected = packets.UDP(6, myaddr, dstaddr, sport=sport)
msg = "After NA response, expecting %s" % desc
self.ExpectPacketOn(netid, msg, expected)
@@ -901,10 +714,10 @@
# Send a packet and receive a packet too big.
SendBigPacket(version, s, dstaddr, netid, payload)
- packets = self.ReadAllPacketsOn(netid)
- self.assertEquals(1, len(packets))
- _, toobig = Packets.ICMPPacketTooBig(version, intermediate, srcaddr,
- packets[0])
+ received = self.ReadAllPacketsOn(netid)
+ self.assertEquals(1, len(received))
+ _, toobig = packets.ICMPPacketTooBig(version, intermediate, srcaddr,
+ received[0])
self.ReceivePacketOn(netid, toobig)
# Check that another send on the same socket returns EMSGSIZE.
@@ -1133,7 +946,7 @@
self._TableForNetid(self.iface2), self.PRIORITY_IIF)
def testCrash(self):
- listenport = Packets.RandomPort()
+ listenport = packets.RandomPort()
self.listensocket = net_test.IPv6TCPSocket()
self.listensocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
self.listensocket.bind(("::", listenport))
@@ -1144,21 +957,21 @@
remoteaddr = self.GetRemoteAddress(version)
myaddr = self.MyAddress(version, self.netid)
- desc, syn = Packets.SYN(listenport, version, remoteaddr, myaddr)
- synack_desc, synack = Packets.SYNACK(version, myaddr, remoteaddr, syn)
+ desc, syn = packets.SYN(listenport, version, remoteaddr, myaddr)
+ synack_desc, synack = packets.SYNACK(version, myaddr, remoteaddr, syn)
msg = "Sent %s, expected %s" % (desc, synack_desc)
reply = self._ReceiveAndExpectResponse(self.netid, syn, synack, msg)
- establishing_ack = Packets.ACK(version, remoteaddr, myaddr, reply)[1]
+ establishing_ack = packets.ACK(version, remoteaddr, myaddr, reply)[1]
self.ReceivePacketOn(self.netid, establishing_ack)
accepted, peer = self.listensocket.accept()
remoteport = accepted.getpeername()[1]
accepted.close()
- desc, fin = Packets.FIN(version, myaddr, remoteaddr, establishing_ack)
+ desc, fin = packets.FIN(version, myaddr, remoteaddr, establishing_ack)
self.ExpectPacketOn(self.netid, msg + ": expecting %s after close" % desc, fin)
- desc, finack = Packets.FIN(version, remoteaddr, myaddr, fin)
+ desc, finack = packets.FIN(version, remoteaddr, myaddr, fin)
self.ReceivePacketOn(self.netid, finack)
# Check our socket is now in TIME_WAIT.
diff --git a/net/test/packets.py b/net/test/packets.py
new file mode 100644
index 0000000..a0b75e8
--- /dev/null
+++ b/net/test/packets.py
@@ -0,0 +1,195 @@
+#!/usr/bin/python
+#
+# Copyright 2015 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import random
+
+from scapy import all as scapy
+from socket import *
+
+import net_test
+
+TCP_FIN = 1
+TCP_SYN = 2
+TCP_RST = 4
+TCP_PSH = 8
+TCP_ACK = 16
+
+TCP_SEQ = 1692871236
+TCP_WINDOW = 14400
+
+PING_IDENT = 0xff19
+PING_PAYLOAD = "foobarbaz"
+PING_SEQ = 3
+PING_TOS = 0x83
+
+# For brevity.
+UDP_PAYLOAD = net_test.UDP_PAYLOAD
+
+
+def RandomPort():
+ return random.randint(1025, 65535)
+
+def _GetIpLayer(version):
+ return {4: scapy.IP, 6: scapy.IPv6}[version]
+
+def _SetPacketTos(packet, tos):
+ if isinstance(packet, scapy.IPv6):
+ packet.tc = tos
+ elif isinstance(packet, scapy.IP):
+ packet.tos = tos
+ else:
+ raise ValueError("Can't find ToS Field")
+
+def UDP(version, srcaddr, dstaddr, sport=0):
+ ip = _GetIpLayer(version)
+ # Can't just use "if sport" because None has meaning (it means unspecified).
+ if sport == 0:
+ sport = RandomPort()
+ return ("UDPv%d packet" % version,
+ ip(src=srcaddr, dst=dstaddr) /
+ scapy.UDP(sport=sport, dport=53) / UDP_PAYLOAD)
+
+def UDPWithOptions(version, srcaddr, dstaddr, sport=0):
+ if version == 4:
+ packet = (scapy.IP(src=srcaddr, dst=dstaddr, ttl=39, tos=0x83) /
+ scapy.UDP(sport=sport, dport=53) /
+ UDP_PAYLOAD)
+ else:
+ packet = (scapy.IPv6(src=srcaddr, dst=dstaddr,
+ fl=0xbeef, hlim=39, tc=0x83) /
+ scapy.UDP(sport=sport, dport=53) /
+ UDP_PAYLOAD)
+ return ("UDPv%d packet with options" % version, packet)
+
+def SYN(dport, version, srcaddr, dstaddr, sport=0, seq=TCP_SEQ):
+ ip = _GetIpLayer(version)
+ if sport == 0:
+ sport = RandomPort()
+ return ("TCP SYN",
+ ip(src=srcaddr, dst=dstaddr) /
+ scapy.TCP(sport=sport, dport=dport,
+ seq=seq, ack=0,
+ flags=TCP_SYN, window=TCP_WINDOW))
+
+def RST(version, srcaddr, dstaddr, packet):
+ ip = _GetIpLayer(version)
+ original = packet.getlayer("TCP")
+ return ("TCP RST",
+ ip(src=srcaddr, dst=dstaddr) /
+ scapy.TCP(sport=original.dport, dport=original.sport,
+ ack=original.seq + 1, seq=None,
+ flags=TCP_RST | TCP_ACK, window=TCP_WINDOW))
+
+def SYNACK(version, srcaddr, dstaddr, packet):
+ ip = _GetIpLayer(version)
+ original = packet.getlayer("TCP")
+ return ("TCP SYN+ACK",
+ ip(src=srcaddr, dst=dstaddr) /
+ scapy.TCP(sport=original.dport, dport=original.sport,
+ ack=original.seq + 1, seq=None,
+ flags=TCP_SYN | TCP_ACK, window=None))
+
+def ACK(version, srcaddr, dstaddr, packet, payload=""):
+ ip = _GetIpLayer(version)
+ original = packet.getlayer("TCP")
+ was_syn_or_fin = (original.flags & (TCP_SYN | TCP_FIN)) != 0
+ ack_delta = was_syn_or_fin + len(original.payload)
+ desc = "TCP data" if payload else "TCP ACK"
+ flags = TCP_ACK | TCP_PSH if payload else TCP_ACK
+ return (desc,
+ ip(src=srcaddr, dst=dstaddr) /
+ scapy.TCP(sport=original.dport, dport=original.sport,
+ ack=original.seq + ack_delta, seq=original.ack,
+ flags=flags, window=TCP_WINDOW) /
+ payload)
+
+def FIN(version, srcaddr, dstaddr, packet):
+ ip = _GetIpLayer(version)
+ original = packet.getlayer("TCP")
+ was_fin = (original.flags & TCP_FIN) != 0
+ return ("TCP FIN",
+ ip(src=srcaddr, dst=dstaddr) /
+ scapy.TCP(sport=original.dport, dport=original.sport,
+ ack=original.seq + was_fin, seq=original.ack,
+ flags=TCP_ACK | TCP_FIN, window=TCP_WINDOW))
+
+def GRE(version, srcaddr, dstaddr, proto, packet):
+ if version == 4:
+ ip = scapy.IP(src=srcaddr, dst=dstaddr, proto=net_test.IPPROTO_GRE)
+ else:
+ ip = scapy.IPv6(src=srcaddr, dst=dstaddr, nh=net_test.IPPROTO_GRE)
+ packet = ip / scapy.GRE(proto=proto) / packet
+ return ("GRE packet", packet)
+
+def ICMPPortUnreachable(version, srcaddr, dstaddr, packet):
+ if version == 4:
+ # Linux hardcodes the ToS on ICMP errors to 0xc0 or greater because of
+ # RFC 1812 4.3.2.5 (!).
+ return ("ICMPv4 port unreachable",
+ scapy.IP(src=srcaddr, dst=dstaddr, proto=1, tos=0xc0) /
+ scapy.ICMPerror(type=3, code=3) / packet)
+ else:
+ return ("ICMPv6 port unreachable",
+ scapy.IPv6(src=srcaddr, dst=dstaddr) /
+ scapy.ICMPv6DestUnreach(code=4) / packet)
+
+def ICMPPacketTooBig(version, srcaddr, dstaddr, packet):
+ if version == 4:
+ return ("ICMPv4 fragmentation needed",
+ scapy.IP(src=srcaddr, dst=dstaddr, proto=1) /
+ scapy.ICMPerror(type=3, code=4, unused=1280) / str(packet)[:64])
+ else:
+ udp = packet.getlayer("UDP")
+ udp.payload = str(udp.payload)[:1280-40-8]
+ return ("ICMPv6 Packet Too Big",
+ scapy.IPv6(src=srcaddr, dst=dstaddr) /
+ scapy.ICMPv6PacketTooBig() / str(packet)[:1232])
+
+def ICMPEcho(version, srcaddr, dstaddr):
+ ip = _GetIpLayer(version)
+ icmp = {4: scapy.ICMP, 6: scapy.ICMPv6EchoRequest}[version]
+ packet = (ip(src=srcaddr, dst=dstaddr) /
+ icmp(id=PING_IDENT, seq=PING_SEQ) / PING_PAYLOAD)
+ _SetPacketTos(packet, PING_TOS)
+ return ("ICMPv%d echo" % version, packet)
+
+def ICMPReply(version, srcaddr, dstaddr, packet):
+ ip = _GetIpLayer(version)
+ # Scapy doesn't provide an ICMP echo reply constructor.
+ icmpv4_reply = lambda **kwargs: scapy.ICMP(type=0, **kwargs)
+ icmp = {4: icmpv4_reply, 6: scapy.ICMPv6EchoReply}[version]
+ packet = (ip(src=srcaddr, dst=dstaddr) /
+ icmp(id=PING_IDENT, seq=PING_SEQ) / PING_PAYLOAD)
+ # IPv6 only started copying the tclass to echo replies in 3.14.
+ if version == 4 or net_test.LINUX_VERSION >= (3, 14):
+ _SetPacketTos(packet, PING_TOS)
+ return ("ICMPv%d echo reply" % version, packet)
+
+def NS(srcaddr, tgtaddr, srcmac):
+ solicited = inet_pton(AF_INET6, tgtaddr)
+ last3bytes = tuple([ord(b) for b in solicited[-3:]])
+ solicited = "ff02::1:ff%02x:%02x%02x" % last3bytes
+ packet = (scapy.IPv6(src=srcaddr, dst=solicited) /
+ scapy.ICMPv6ND_NS(tgt=tgtaddr) /
+ scapy.ICMPv6NDOptSrcLLAddr(lladdr=srcmac))
+ return ("ICMPv6 NS", packet)
+
+def NA(srcaddr, dstaddr, srcmac):
+ packet = (scapy.IPv6(src=srcaddr, dst=dstaddr) /
+ scapy.ICMPv6ND_NA(tgt=srcaddr, R=0, S=1, O=1) /
+ scapy.ICMPv6NDOptDstLLAddr(lladdr=srcmac))
+ return ("ICMPv6 NA", packet)
+