8800f732f8
- Make framework.py classes a subset of asfframework.py classes - Remove all packet related code from asfframework.py - Add test class and test case set up debug output to log - Repatriate packet tests from asf to test directory - Remove non-packet related code from framework.py and inherit them from asfframework.py classes - Clean up unused import variables - Re-enable BFD tests on Ubuntu 22.04 and fix intermittent test failures in echo_looped_back testcases (where # control packets verified but not guaranteed to be received during test) - Re-enable Wireguard tests on Ubuntu 22.04 and fix intermittent test failures in handshake ratelimiting testcases and event testcase - Run Wiregard testcase suites solo - Improve debug output in log.txt - Increase VCL/LDP post sleep timeout to allow iperf server to finish cleanly. - Fix pcap history files to be sorted by suite and testcase and ensure order/timestamp is correct based on creation in the testcase. - Decode pcap files for each suite and testcase for all errors or if configured via comandline option / env var - Improve vpp corefile detection to allow complete corefile generation - Disable vm vpp interfaces testcases on debian11 - Clean up failed unittest dir when retrying failed testcases and unify testname directory and failed linknames into framwork functions Type: test Change-Id: I0764f79ea5bb639d278bf635ed2408d4d5220e1e Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
142 lines
4.8 KiB
Python
142 lines
4.8 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import unittest
|
|
import ipaddress
|
|
from framework import VppTestCase
|
|
from asfframework import VppTestRunner
|
|
|
|
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6DestUnreach
|
|
from scapy.layers.l2 import Ether
|
|
from scapy.packet import Raw
|
|
|
|
|
|
class TestNPT66(VppTestCase):
|
|
"""NPTv6 Test Case"""
|
|
|
|
extra_vpp_plugin_config = [
|
|
"plugin npt66_plugin.so {enable}",
|
|
]
|
|
|
|
def setUp(self):
|
|
super(TestNPT66, self).setUp()
|
|
|
|
# create 2 pg interfaces
|
|
self.create_pg_interfaces(range(2))
|
|
|
|
for i in self.pg_interfaces:
|
|
i.admin_up()
|
|
i.config_ip6()
|
|
i.resolve_ndp()
|
|
|
|
def tearDown(self):
|
|
for i in self.pg_interfaces:
|
|
i.unconfig_ip6()
|
|
i.admin_down()
|
|
super(TestNPT66, self).tearDown()
|
|
|
|
def send_and_verify(self, internal, reply_icmp_error=False):
|
|
sendif = self.pg0
|
|
recvif = self.pg1
|
|
local_mac = self.pg0.local_mac
|
|
remote_mac = self.pg0.remote_mac
|
|
src = ipaddress.ip_interface(internal).ip + 1
|
|
dst = self.pg1.remote_ip6
|
|
|
|
p = (
|
|
Ether(dst=local_mac, src=remote_mac)
|
|
/ IPv6(src=src, dst=dst)
|
|
/ ICMPv6EchoRequest()
|
|
/ Raw(b"Request")
|
|
)
|
|
# print('Sending packet')
|
|
# p.show2()
|
|
rxs = self.send_and_expect(sendif, p, recvif)
|
|
for rx in rxs:
|
|
# print('Received packet')
|
|
# rx.show2()
|
|
original_cksum = rx[ICMPv6EchoRequest].cksum
|
|
del rx[ICMPv6EchoRequest].cksum
|
|
rx = rx.__class__(bytes(rx))
|
|
self.assertEqual(original_cksum, rx[ICMPv6EchoRequest].cksum)
|
|
|
|
# Generate a replies
|
|
if reply_icmp_error:
|
|
# print('Generating an ICMP error message')
|
|
reply = (
|
|
Ether(dst=rx[Ether].src, src=local_mac)
|
|
/ IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
|
|
/ ICMPv6DestUnreach()
|
|
/ rx[IPv6]
|
|
)
|
|
# print('Sending ICMP error message reply')
|
|
# reply.show2()
|
|
replies = self.send_and_expect(recvif, reply, sendif)
|
|
for r in replies:
|
|
# print('Received ICMP error message reply on the other side')
|
|
# r.show2()
|
|
self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
|
|
original_cksum = r[ICMPv6EchoRequest].cksum
|
|
del r[ICMPv6EchoRequest].cksum
|
|
r = r.__class__(bytes(r))
|
|
self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
|
|
|
|
else:
|
|
reply = (
|
|
Ether(dst=rx[Ether].src, src=local_mac)
|
|
/ IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
|
|
/ ICMPv6EchoRequest()
|
|
/ Raw(b"Reply")
|
|
)
|
|
|
|
replies = self.send_and_expect(recvif, reply, sendif)
|
|
for r in replies:
|
|
# r.show2()
|
|
self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
|
|
original_cksum = r[ICMPv6EchoRequest].cksum
|
|
del r[ICMPv6EchoRequest].cksum
|
|
r = r.__class__(bytes(r))
|
|
self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
|
|
|
|
def do_test(self, internal, external, reply_icmp_error=False):
|
|
"""Add NPT66 binding and send packet"""
|
|
self.vapi.npt66_binding_add_del(
|
|
sw_if_index=self.pg1.sw_if_index,
|
|
internal=internal,
|
|
external=external,
|
|
is_add=True,
|
|
)
|
|
## TODO use route api
|
|
self.vapi.cli(f"ip route add {internal} via {self.pg0.remote_ip6}")
|
|
|
|
self.send_and_verify(internal, reply_icmp_error=reply_icmp_error)
|
|
|
|
self.vapi.npt66_binding_add_del(
|
|
sw_if_index=self.pg1.sw_if_index,
|
|
internal=internal,
|
|
external=external,
|
|
is_add=False,
|
|
)
|
|
|
|
def test_npt66_simple(self):
|
|
"""Send and receive a packet through NPT66"""
|
|
|
|
self.do_test("fd00:0000:0000::/48", "2001:4650:c3ed::/48")
|
|
self.do_test("fc00:1::/48", "2001:db8:1::/48")
|
|
self.do_test("fc00:1234::/32", "2001:db8:1::/32")
|
|
self.do_test("fc00:1234::/63", "2001:db8:1::/56")
|
|
|
|
def test_npt66_icmp6(self):
|
|
"""Send and receive a packet through NPT66"""
|
|
|
|
# Test ICMP6 error packets
|
|
self.do_test(
|
|
"fd00:0000:0000::/48", "2001:4650:c3ed::/48", reply_icmp_error=True
|
|
)
|
|
self.do_test("fc00:1::/48", "2001:db8:1::/48", reply_icmp_error=True)
|
|
self.do_test("fc00:1234::/32", "2001:db8:1::/32", reply_icmp_error=True)
|
|
self.do_test("fc00:1234::/63", "2001:db8:1::/56", reply_icmp_error=True)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main(testRunner=VppTestRunner)
|