tests: add fast path ipv6 python tests for outbound policy matching

This patch introduces set of python tests for fast path ipv6, based on
ipv4 tests. Some missing parts of ipsec framework has been added
in order to test ipv6 implementation.

Type: feature
Signed-off-by: Piotr Bronowski <piotrx.bronowski@intel.com>
Change-Id: Icc13322787d76485c08106bad2cb071947ad9846
This commit is contained in:
Piotr Bronowski
2022-07-08 12:45:51 +00:00
committed by Fan Zhang
parent 86f8208af4
commit 651cc01b64
2 changed files with 857 additions and 6 deletions

View File

@ -2088,5 +2088,200 @@ class SpdFlowCacheTemplate(IPSecIPv4Fwd):
return False
class IPSecIPv6Fwd(VppTestCase):
"""Test IPSec by capturing and verifying IPv6 forwarded pkts"""
@classmethod
def setUpConstants(cls):
super(IPSecIPv6Fwd, cls).setUpConstants()
def setUp(self):
super(IPSecIPv6Fwd, self).setUp()
# store SPD objects so we can remove configs on tear down
self.spd_objs = []
self.spd_policies = []
def tearDown(self):
# remove SPD policies
for obj in self.spd_policies:
obj.remove_vpp_config()
self.spd_policies = []
# remove SPD items (interface bindings first, then SPD)
for obj in reversed(self.spd_objs):
obj.remove_vpp_config()
self.spd_objs = []
# close down pg intfs
for pg in self.pg_interfaces:
pg.unconfig_ip6()
pg.admin_down()
super(IPSecIPv6Fwd, self).tearDown()
def create_interfaces(self, num_ifs=2):
# create interfaces pg0 ... pg<num_ifs>
self.create_pg_interfaces(range(num_ifs))
for pg in self.pg_interfaces:
# put the interface up
pg.admin_up()
# configure IPv6 address on the interface
pg.config_ip6()
pg.resolve_ndp()
self.logger.info(self.vapi.ppcli("show int addr"))
def spd_create_and_intf_add(self, spd_id, pg_list):
spd = VppIpsecSpd(self, spd_id)
spd.add_vpp_config()
self.spd_objs.append(spd)
for pg in pg_list:
spdItf = VppIpsecSpdItfBinding(self, spd, pg)
spdItf.add_vpp_config()
self.spd_objs.append(spdItf)
def get_policy(self, policy_type):
e = VppEnum.vl_api_ipsec_spd_action_t
if policy_type == "protect":
return e.IPSEC_API_SPD_ACTION_PROTECT
elif policy_type == "bypass":
return e.IPSEC_API_SPD_ACTION_BYPASS
elif policy_type == "discard":
return e.IPSEC_API_SPD_ACTION_DISCARD
else:
raise Exception("Invalid policy type: %s", policy_type)
def spd_add_rem_policy(
self,
spd_id,
src_if,
dst_if,
proto,
is_out,
priority,
policy_type,
remove=False,
all_ips=False,
ip_range=False,
local_ip_start=ip_address("0::0"),
local_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"),
remote_ip_start=ip_address("0::0"),
remote_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"),
remote_port_start=0,
remote_port_stop=65535,
local_port_start=0,
local_port_stop=65535,
):
spd = VppIpsecSpd(self, spd_id)
if all_ips:
src_range_low = ip_address("0::0")
src_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
dst_range_low = ip_address("0::0")
dst_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
elif ip_range:
src_range_low = local_ip_start
src_range_high = local_ip_stop
dst_range_low = remote_ip_start
dst_range_high = remote_ip_stop
else:
src_range_low = src_if.remote_ip6
src_range_high = src_if.remote_ip6
dst_range_low = dst_if.remote_ip6
dst_range_high = dst_if.remote_ip6
spdEntry = VppIpsecSpdEntry(
self,
spd,
0,
src_range_low,
src_range_high,
dst_range_low,
dst_range_high,
proto,
priority=priority,
policy=self.get_policy(policy_type),
is_outbound=is_out,
remote_port_start=remote_port_start,
remote_port_stop=remote_port_stop,
local_port_start=local_port_start,
local_port_stop=local_port_stop,
)
if remove is False:
spdEntry.add_vpp_config()
self.spd_policies.append(spdEntry)
else:
spdEntry.remove_vpp_config()
self.spd_policies.remove(spdEntry)
self.logger.info(self.vapi.ppcli("show ipsec all"))
return spdEntry
def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
packets = []
for i in range(pkt_count):
# create packet info stored in the test case instance
info = self.create_packet_info(src_if, dst_if)
# convert the info into packet payload
payload = self.info_to_payload(info)
# create the packet itself
p = (
Ether(dst=src_if.local_mac, src=src_if.remote_mac)
/ IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
/ UDP(sport=src_prt, dport=dst_prt)
/ Raw(payload)
)
# store a copy of the packet in the packet info
info.data = p.copy()
# append the packet to the list
packets.append(p)
# return the created packet list
return packets
def verify_capture(self, src_if, dst_if, capture):
packet_info = None
for packet in capture:
try:
ip = packet[IPv6]
udp = packet[UDP]
# convert the payload to packet info object
payload_info = self.payload_to_info(packet)
# make sure the indexes match
self.assert_equal(
payload_info.src, src_if.sw_if_index, "source sw_if_index"
)
self.assert_equal(
payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
)
packet_info = self.get_next_packet_info_for_interface2(
src_if.sw_if_index, dst_if.sw_if_index, packet_info
)
# make sure we didn't run out of saved packets
self.assertIsNotNone(packet_info)
self.assert_equal(
payload_info.index, packet_info.index, "packet info index"
)
saved_packet = packet_info.data # fetch the saved packet
# assert the values match
self.assert_equal(ip.src, saved_packet[IPv6].src, "IP source address")
# ... more assertions here
self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
except Exception as e:
self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
remaining_packet = self.get_next_packet_info_for_interface2(
src_if.sw_if_index, dst_if.sw_if_index, packet_info
)
self.assertIsNone(
remaining_packet,
"Interface %s: Packet expected from interface "
"%s didn't arrive" % (dst_if.name, src_if.name),
)
def verify_policy_match(self, pkt_count, spdEntry):
self.logger.info("XXXX %s %s", str(spdEntry), str(spdEntry.get_stats()))
matched_pkts = spdEntry.get_stats().get("packets")
self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
self.assert_equal(pkt_count, matched_pkts)
if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)