8800f732f8
- Make framework.py classes a subset of asfframework.py classes - Remove all packet related code from asfframework.py - Add test class and test case set up debug output to log - Repatriate packet tests from asf to test directory - Remove non-packet related code from framework.py and inherit them from asfframework.py classes - Clean up unused import variables - Re-enable BFD tests on Ubuntu 22.04 and fix intermittent test failures in echo_looped_back testcases (where # control packets verified but not guaranteed to be received during test) - Re-enable Wireguard tests on Ubuntu 22.04 and fix intermittent test failures in handshake ratelimiting testcases and event testcase - Run Wiregard testcase suites solo - Improve debug output in log.txt - Increase VCL/LDP post sleep timeout to allow iperf server to finish cleanly. - Fix pcap history files to be sorted by suite and testcase and ensure order/timestamp is correct based on creation in the testcase. - Decode pcap files for each suite and testcase for all errors or if configured via comandline option / env var - Improve vpp corefile detection to allow complete corefile generation - Disable vm vpp interfaces testcases on debian11 - Clean up failed unittest dir when retrying failed testcases and unify testname directory and failed linknames into framwork functions Type: test Change-Id: I0764f79ea5bb639d278bf635ed2408d4d5220e1e Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
478 lines
14 KiB
Python
478 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import unittest
|
|
|
|
from config import config
|
|
from framework import VppTestCase
|
|
from asfframework import VppTestRunner
|
|
from vpp_ip_route import (
|
|
VppIpRoute,
|
|
VppRoutePath,
|
|
VppIpTable,
|
|
)
|
|
from vpp_acl import AclRule, VppAcl
|
|
|
|
from scapy.packet import Raw
|
|
from scapy.layers.l2 import Ether
|
|
from scapy.layers.inet import IP, UDP
|
|
from scapy.layers.inet6 import IPv6
|
|
from ipaddress import IPv4Network, IPv6Network
|
|
|
|
from vpp_object import VppObject
|
|
|
|
NUM_PKTS = 67
|
|
|
|
|
|
def find_abf_policy(test, id):
|
|
policies = test.vapi.abf_policy_dump()
|
|
for p in policies:
|
|
if id == p.policy.policy_id:
|
|
return True
|
|
return False
|
|
|
|
|
|
def find_abf_itf_attach(test, id, sw_if_index):
|
|
attachs = test.vapi.abf_itf_attach_dump()
|
|
for a in attachs:
|
|
if id == a.attach.policy_id and sw_if_index == a.attach.sw_if_index:
|
|
return True
|
|
return False
|
|
|
|
|
|
class VppAbfPolicy(VppObject):
|
|
def __init__(self, test, policy_id, acl, paths):
|
|
self._test = test
|
|
self.policy_id = policy_id
|
|
self.acl = acl
|
|
self.paths = paths
|
|
self.encoded_paths = []
|
|
for path in self.paths:
|
|
self.encoded_paths.append(path.encode())
|
|
|
|
def add_vpp_config(self):
|
|
self._test.vapi.abf_policy_add_del(
|
|
1,
|
|
{
|
|
"policy_id": self.policy_id,
|
|
"acl_index": self.acl.acl_index,
|
|
"n_paths": len(self.paths),
|
|
"paths": self.encoded_paths,
|
|
},
|
|
)
|
|
self._test.registry.register(self, self._test.logger)
|
|
|
|
def remove_vpp_config(self):
|
|
self._test.vapi.abf_policy_add_del(
|
|
0,
|
|
{
|
|
"policy_id": self.policy_id,
|
|
"acl_index": self.acl.acl_index,
|
|
"n_paths": len(self.paths),
|
|
"paths": self.encoded_paths,
|
|
},
|
|
)
|
|
|
|
def query_vpp_config(self):
|
|
return find_abf_policy(self._test, self.policy_id)
|
|
|
|
def object_id(self):
|
|
return "abf-policy-%d" % self.policy_id
|
|
|
|
|
|
class VppAbfAttach(VppObject):
|
|
def __init__(self, test, policy_id, sw_if_index, priority, is_ipv6=0):
|
|
self._test = test
|
|
self.policy_id = policy_id
|
|
self.sw_if_index = sw_if_index
|
|
self.priority = priority
|
|
self.is_ipv6 = is_ipv6
|
|
|
|
def add_vpp_config(self):
|
|
self._test.vapi.abf_itf_attach_add_del(
|
|
1,
|
|
{
|
|
"policy_id": self.policy_id,
|
|
"sw_if_index": self.sw_if_index,
|
|
"priority": self.priority,
|
|
"is_ipv6": self.is_ipv6,
|
|
},
|
|
)
|
|
self._test.registry.register(self, self._test.logger)
|
|
|
|
def remove_vpp_config(self):
|
|
self._test.vapi.abf_itf_attach_add_del(
|
|
0,
|
|
{
|
|
"policy_id": self.policy_id,
|
|
"sw_if_index": self.sw_if_index,
|
|
"priority": self.priority,
|
|
"is_ipv6": self.is_ipv6,
|
|
},
|
|
)
|
|
|
|
def query_vpp_config(self):
|
|
return find_abf_itf_attach(self._test, self.policy_id, self.sw_if_index)
|
|
|
|
def object_id(self):
|
|
return "abf-attach-%d-%d" % (self.policy_id, self.sw_if_index)
|
|
|
|
|
|
@unittest.skipIf(
|
|
"acl" in config.excluded_plugins,
|
|
"Exclude ABF plugin tests due to absence of ACL plugin",
|
|
)
|
|
@unittest.skipIf("abf" in config.excluded_plugins, "Exclude ABF plugin tests")
|
|
class TestAbf(VppTestCase):
|
|
"""ABF Test Case"""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(TestAbf, cls).setUpClass()
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
super(TestAbf, cls).tearDownClass()
|
|
|
|
def setUp(self):
|
|
super(TestAbf, self).setUp()
|
|
|
|
self.create_pg_interfaces(range(5))
|
|
|
|
for i in self.pg_interfaces[:4]:
|
|
i.admin_up()
|
|
i.config_ip4()
|
|
i.resolve_arp()
|
|
i.config_ip6()
|
|
i.resolve_ndp()
|
|
|
|
def tearDown(self):
|
|
for i in self.pg_interfaces:
|
|
i.unconfig_ip4()
|
|
i.unconfig_ip6()
|
|
i.admin_down()
|
|
super(TestAbf, self).tearDown()
|
|
|
|
def test_abf4(self):
|
|
"""IPv4 ACL Based Forwarding"""
|
|
|
|
#
|
|
# We are not testing the various matching capabilities
|
|
# of ACLs, that's done elsewhere. Here ware are testing
|
|
# the application of ACLs to a forwarding path to achieve
|
|
# ABF
|
|
# So we construct just a few ACLs to ensure the ABF policies
|
|
# are correctly constructed and used. And a few path types
|
|
# to test the API path decoding.
|
|
#
|
|
|
|
#
|
|
# Rule 1
|
|
#
|
|
rule_1 = AclRule(
|
|
is_permit=1,
|
|
proto=17,
|
|
ports=1234,
|
|
src_prefix=IPv4Network("1.1.1.1/32"),
|
|
dst_prefix=IPv4Network("1.1.1.2/32"),
|
|
)
|
|
acl_1 = VppAcl(self, rules=[rule_1])
|
|
acl_1.add_vpp_config()
|
|
|
|
#
|
|
# ABF policy for ACL 1 - path via interface 1
|
|
#
|
|
abf_1 = VppAbfPolicy(
|
|
self, 10, acl_1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
|
|
)
|
|
abf_1.add_vpp_config()
|
|
|
|
#
|
|
# Attach the policy to input interface Pg0
|
|
#
|
|
attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
|
|
attach_1.add_vpp_config()
|
|
|
|
#
|
|
# fire in packet matching the ACL src,dst. If it's forwarded
|
|
# then the ABF was successful, since default routing will drop it
|
|
#
|
|
p_1 = (
|
|
Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
|
|
/ IP(src="1.1.1.1", dst="1.1.1.2")
|
|
/ UDP(sport=1234, dport=1234)
|
|
/ Raw(b"\xa5" * 100)
|
|
)
|
|
self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg1)
|
|
|
|
#
|
|
# Attach a 'better' priority policy to the same interface
|
|
#
|
|
abf_2 = VppAbfPolicy(
|
|
self, 11, acl_1, [VppRoutePath(self.pg2.remote_ip4, self.pg2.sw_if_index)]
|
|
)
|
|
abf_2.add_vpp_config()
|
|
attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40)
|
|
attach_2.add_vpp_config()
|
|
|
|
self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg2)
|
|
|
|
#
|
|
# Attach a policy with priority in the middle
|
|
#
|
|
abf_3 = VppAbfPolicy(
|
|
self, 12, acl_1, [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)]
|
|
)
|
|
abf_3.add_vpp_config()
|
|
attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45)
|
|
attach_3.add_vpp_config()
|
|
|
|
self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg2)
|
|
|
|
#
|
|
# remove the best priority
|
|
#
|
|
attach_2.remove_vpp_config()
|
|
self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg3)
|
|
|
|
#
|
|
# Attach one of the same policies to Pg1
|
|
#
|
|
attach_4 = VppAbfAttach(self, 12, self.pg1.sw_if_index, 45)
|
|
attach_4.add_vpp_config()
|
|
|
|
p_2 = (
|
|
Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
|
|
/ IP(src="1.1.1.1", dst="1.1.1.2")
|
|
/ UDP(sport=1234, dport=1234)
|
|
/ Raw(b"\xa5" * 100)
|
|
)
|
|
self.send_and_expect(self.pg1, p_2 * NUM_PKTS, self.pg3)
|
|
|
|
#
|
|
# detach the policy from PG1, now expect traffic to be dropped
|
|
#
|
|
attach_4.remove_vpp_config()
|
|
|
|
self.send_and_assert_no_replies(self.pg1, p_2 * NUM_PKTS, "Detached")
|
|
|
|
#
|
|
# Swap to route via a next-hop in the non-default table
|
|
#
|
|
table_20 = VppIpTable(self, 20)
|
|
table_20.add_vpp_config()
|
|
|
|
self.pg4.set_table_ip4(table_20.table_id)
|
|
self.pg4.admin_up()
|
|
self.pg4.config_ip4()
|
|
self.pg4.resolve_arp()
|
|
|
|
abf_13 = VppAbfPolicy(
|
|
self,
|
|
13,
|
|
acl_1,
|
|
[
|
|
VppRoutePath(
|
|
self.pg4.remote_ip4, 0xFFFFFFFF, nh_table_id=table_20.table_id
|
|
)
|
|
],
|
|
)
|
|
abf_13.add_vpp_config()
|
|
attach_5 = VppAbfAttach(self, 13, self.pg0.sw_if_index, 30)
|
|
attach_5.add_vpp_config()
|
|
|
|
self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg4)
|
|
|
|
self.pg4.unconfig_ip4()
|
|
self.pg4.set_table_ip4(0)
|
|
|
|
def test_abf6(self):
|
|
"""IPv6 ACL Based Forwarding"""
|
|
|
|
#
|
|
# Simple test for matching IPv6 packets
|
|
#
|
|
|
|
#
|
|
# Rule 1
|
|
#
|
|
rule_1 = AclRule(
|
|
is_permit=1,
|
|
proto=17,
|
|
ports=1234,
|
|
src_prefix=IPv6Network("2001::2/128"),
|
|
dst_prefix=IPv6Network("2001::1/128"),
|
|
)
|
|
acl_1 = VppAcl(self, rules=[rule_1])
|
|
acl_1.add_vpp_config()
|
|
|
|
#
|
|
# ABF policy for ACL 1 - path via interface 1
|
|
#
|
|
abf_1 = VppAbfPolicy(self, 10, acl_1, [VppRoutePath("3001::1", 0xFFFFFFFF)])
|
|
abf_1.add_vpp_config()
|
|
|
|
attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 45, is_ipv6=True)
|
|
attach_1.add_vpp_config()
|
|
|
|
#
|
|
# a packet matching the rule
|
|
#
|
|
p = (
|
|
Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
|
|
/ IPv6(src="2001::2", dst="2001::1")
|
|
/ UDP(sport=1234, dport=1234)
|
|
/ Raw(b"\xa5" * 100)
|
|
)
|
|
|
|
#
|
|
# packets are dropped because there is no route to the policy's
|
|
# next hop
|
|
#
|
|
self.send_and_assert_no_replies(self.pg1, p * NUM_PKTS, "no route")
|
|
|
|
#
|
|
# add a route resolving the next-hop
|
|
#
|
|
route = VppIpRoute(
|
|
self,
|
|
"3001::1",
|
|
32,
|
|
[VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
|
|
)
|
|
route.add_vpp_config()
|
|
|
|
#
|
|
# now expect packets forwarded.
|
|
#
|
|
self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1)
|
|
|
|
def test_abf4_deny(self):
|
|
"""IPv4 ACL Deny Rule"""
|
|
import ipaddress
|
|
|
|
#
|
|
# Rules 1/2
|
|
#
|
|
pg0_subnet = ipaddress.ip_network(self.pg0.local_ip4_prefix, strict=False)
|
|
pg2_subnet = ipaddress.ip_network(self.pg2.local_ip4_prefix, strict=False)
|
|
pg3_subnet = ipaddress.ip_network(self.pg3.local_ip4_prefix, strict=False)
|
|
rule_deny = AclRule(
|
|
is_permit=0,
|
|
proto=17,
|
|
ports=1234,
|
|
src_prefix=IPv4Network(pg0_subnet),
|
|
dst_prefix=IPv4Network(pg3_subnet),
|
|
)
|
|
rule_permit = AclRule(
|
|
is_permit=1,
|
|
proto=17,
|
|
ports=1234,
|
|
src_prefix=IPv4Network(pg0_subnet),
|
|
dst_prefix=IPv4Network(pg2_subnet),
|
|
)
|
|
acl_1 = VppAcl(self, rules=[rule_deny, rule_permit])
|
|
acl_1.add_vpp_config()
|
|
|
|
#
|
|
# ABF policy for ACL 1 - path via interface 1
|
|
#
|
|
abf_1 = VppAbfPolicy(
|
|
self, 10, acl_1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
|
|
)
|
|
abf_1.add_vpp_config()
|
|
|
|
#
|
|
# Attach the policy to input interface Pg0
|
|
#
|
|
attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
|
|
attach_1.add_vpp_config()
|
|
|
|
#
|
|
# a packet matching the deny rule
|
|
#
|
|
p_deny = (
|
|
Ether(src=self.pg0.remote_mac, dst=self.pg3.remote_mac)
|
|
/ IP(src=self.pg0.remote_ip4, dst=self.pg3.remote_ip4)
|
|
/ UDP(sport=1234, dport=1234)
|
|
/ Raw(b"\xa5" * 100)
|
|
)
|
|
self.send_and_expect(self.pg0, p_deny * NUM_PKTS, self.pg3)
|
|
|
|
#
|
|
# a packet matching the permit rule
|
|
#
|
|
p_permit = (
|
|
Ether(src=self.pg0.remote_mac, dst=self.pg2.remote_mac)
|
|
/ IP(src=self.pg0.remote_ip4, dst=self.pg2.remote_ip4)
|
|
/ UDP(sport=1234, dport=1234)
|
|
/ Raw(b"\xa5" * 100)
|
|
)
|
|
self.send_and_expect(self.pg0, p_permit * NUM_PKTS, self.pg1)
|
|
|
|
def test_abf6_deny(self):
|
|
"""IPv6 ACL Deny Rule"""
|
|
import ipaddress
|
|
|
|
#
|
|
# Rules 1/2
|
|
#
|
|
pg0_subnet = ipaddress.ip_network(self.pg0.local_ip6_prefix, strict=False)
|
|
pg2_subnet = ipaddress.ip_network(self.pg2.local_ip6_prefix, strict=False)
|
|
pg3_subnet = ipaddress.ip_network(self.pg3.local_ip6_prefix, strict=False)
|
|
rule_deny = AclRule(
|
|
is_permit=0,
|
|
proto=17,
|
|
ports=1234,
|
|
src_prefix=IPv6Network(pg0_subnet),
|
|
dst_prefix=IPv6Network(pg3_subnet),
|
|
)
|
|
rule_permit = AclRule(
|
|
is_permit=1,
|
|
proto=17,
|
|
ports=1234,
|
|
src_prefix=IPv6Network(pg0_subnet),
|
|
dst_prefix=IPv6Network(pg2_subnet),
|
|
)
|
|
acl_1 = VppAcl(self, rules=[rule_deny, rule_permit])
|
|
acl_1.add_vpp_config()
|
|
|
|
#
|
|
# ABF policy for ACL 1 - path via interface 1
|
|
#
|
|
abf_1 = VppAbfPolicy(
|
|
self, 10, acl_1, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
|
|
)
|
|
abf_1.add_vpp_config()
|
|
|
|
#
|
|
# Attach the policy to input interface Pg0
|
|
#
|
|
attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50, is_ipv6=1)
|
|
attach_1.add_vpp_config()
|
|
|
|
#
|
|
# a packet matching the deny rule
|
|
#
|
|
p_deny = (
|
|
Ether(src=self.pg0.remote_mac, dst=self.pg3.remote_mac)
|
|
/ IPv6(src=self.pg0.remote_ip6, dst=self.pg3.remote_ip6)
|
|
/ UDP(sport=1234, dport=1234)
|
|
/ Raw(b"\xa5" * 100)
|
|
)
|
|
self.send_and_expect(self.pg0, p_deny * NUM_PKTS, self.pg3)
|
|
|
|
#
|
|
# a packet matching the permit rule
|
|
#
|
|
p_permit = (
|
|
Ether(src=self.pg0.remote_mac, dst=self.pg2.remote_mac)
|
|
/ IPv6(src=self.pg0.remote_ip6, dst=self.pg2.remote_ip6)
|
|
/ UDP(sport=1234, dport=1234)
|
|
/ Raw(b"\xa5" * 100)
|
|
)
|
|
self.send_and_expect(self.pg0, p_permit * NUM_PKTS, self.pg1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main(testRunner=VppTestRunner)
|