tests: add map-t fragmentation verifications

Type: test

Change-Id: I5522e88ee178d0563c246895393e835d125f1b81
Signed-off-by: Alexander Chernavin <achernavin@netgate.com>
This commit is contained in:
Alexander Chernavin
2020-01-17 08:31:04 -05:00
committed by Paul Vinciguerra
parent f31acfafb2
commit 31b1a6ce1d

View File

@ -11,8 +11,8 @@ from util import fragment_rfc791, fragment_rfc8200
import scapy.compat
from scapy.layers.l2 import Ether
from scapy.packet import Raw
from scapy.layers.inet import IP, UDP, ICMP, TCP, fragment
from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
from scapy.layers.inet import IP, UDP, ICMP, TCP
from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded, IPv6ExtHdrFragment
class TestMAP(VppTestCase):
@ -435,6 +435,26 @@ class TestMAP(VppTestCase):
def validate(self, rx, expected):
self.assertEqual(rx, expected.__class__(scapy.compat.raw(expected)))
def validate_frag(self, p6_frag, p_ip6_expected):
self.assertFalse(p6_frag.haslayer(IP))
self.assertTrue(p6_frag.haslayer(IPv6))
self.assertTrue(p6_frag.haslayer(IPv6ExtHdrFragment))
self.assertEqual(p6_frag[IPv6].src, p_ip6_expected.src)
self.assertEqual(p6_frag[IPv6].dst, p_ip6_expected.dst)
def validate_frag_payload_len(self, rx, proto, payload_len_expected):
payload_total = 0
for p in rx:
payload_total += p[IPv6].plen
# First fragment has proto
payload_total -= len(proto())
# Every fragment has IPv6 fragment header
payload_total -= len(IPv6ExtHdrFragment()) * len(rx)
self.assertEqual(payload_total, payload_len_expected)
def payload(self, len):
return 'x' * len
@ -579,28 +599,34 @@ class TestMAP(VppTestCase):
self.send_and_assert_no_replies(self.pg1, p6*1)
# Packet fragmentation
payload = UDP(sport=40000, dport=4000) / self.payload(1453)
payload_len = 1453
payload = UDP(sport=40000, dport=4000) / self.payload(payload_len)
p4 = (p_ether / p_ip4 / payload)
self.pg_enable_capture()
self.pg0.add_stream(p4)
self.pg_start()
rx = self.pg1.get_capture(2)
p_ip6_translated = IPv6(src='1234:5678:90ab:cdef:ac:1001:200:0',
dst='2001:db8:1e0::c0a8:1:e')
for p in rx:
pass
# TODO: Manual validation
# self.validate(p[1], icmp4_reply)
self.validate_frag(p, p_ip6_translated)
self.validate_frag_payload_len(rx, UDP, payload_len)
# Packet fragmentation send fragments
payload = UDP(sport=40000, dport=4000) / self.payload(1453)
payload = UDP(sport=40000, dport=4000) / self.payload(payload_len)
p4 = (p_ether / p_ip4 / payload)
frags = fragment(p4, fragsize=1000)
frags = fragment_rfc791(p4, fragsize=1000)
self.pg_enable_capture()
self.pg0.add_stream(frags)
self.pg_start()
rx = self.pg1.get_capture(2)
for p in rx:
pass
# p.show2()
self.validate_frag(p, p_ip6_translated)
self.validate_frag_payload_len(rx, UDP, payload_len)
# reass_pkt = reassemble(rx)
# p4_reply.ttl -= 1