ipsec: modify IPsec related tests to send and verify UDP-encapsulated ESP traffics

In this patch, IPsec related test files have been modified to send UDP-encapsulated
ESP packets,and validate against Inbound and Outbound policies that are configured
with Bypass, Discard and Protect action.

Type: test

Change-Id: I4b8da18270fd177868223bfe1389dc9c50e86cc5
Signed-off-by: vinay Tripathi <vinayx.tripathi@intel.com>
This commit is contained in:
vinay Tripathi
2023-10-20 05:20:47 +00:00
committed by Fan Zhang
parent 75069cee95
commit bc5f530599
3 changed files with 162 additions and 12 deletions

View File

@ -3,7 +3,7 @@ import unittest
from util import ppp
from asfframework import VppTestRunner
from template_ipsec import IPSecIPv4Fwd
from template_ipsec import IpsecDefaultTemplate
"""
When an IPSec SPD is configured on an interface, any inbound packets
@ -32,7 +32,7 @@ packets are dropped as expected.
"""
class IPSecInboundDefaultDrop(IPSecIPv4Fwd):
class IPSecInboundDefaultDrop(IpsecDefaultTemplate):
"""IPSec: inbound packets drop by default with no matching rule"""
def test_ipsec_inbound_default_drop(self):
@ -114,7 +114,7 @@ class IPSecInboundDefaultDrop(IPSecIPv4Fwd):
self.verify_policy_match(pkt_count, inbound_policy)
class IPSecOutboundDefaultDrop(IPSecIPv4Fwd):
class IPSecOutboundDefaultDrop(IpsecDefaultTemplate):
"""IPSec: outbound packets drop by default with no matching rule"""
def test_ipsec_inbound_default_drop(self):

View File

@ -2806,20 +2806,48 @@ class IPSecIPv4Fwd(VppTestCase):
self.logger.info(self.vapi.ppcli("show ipsec all"))
return spdEntry
def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
def create_stream(
self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP"
):
packets = []
# create SA
sa = SecurityAssociation(
ESP,
spi=1000,
crypt_algo="AES-CBC",
crypt_key=b"JPjyOWBeVEQiMe7h",
auth_algo="HMAC-SHA1-96",
auth_key=b"C91KUR9GYMm5GfkEvNjX",
tunnel_header=IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
nat_t_header=UDP(sport=src_prt, dport=dst_prt),
)
for i in range(pkt_count):
# create packet info stored in the test case instance
info = self.create_packet_info(src_if, dst_if)
# convert the info into packet payload
payload = self.info_to_payload(info)
# create the packet itself
p = (
Ether(dst=src_if.local_mac, src=src_if.remote_mac)
/ IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
/ UDP(sport=src_prt, dport=dst_prt)
/ Raw(payload)
)
p = []
if proto == "UDP-ESP":
p = Ether(dst=src_if.local_mac, src=src_if.remote_mac) / sa.encrypt(
IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
/ UDP(sport=src_prt, dport=dst_prt)
/ Raw(payload)
)
elif proto == "UDP":
p = (
Ether(dst=src_if.local_mac, src=src_if.remote_mac)
/ IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
/ UDP(sport=src_prt, dport=dst_prt)
/ Raw(payload)
)
elif proto == "TCP":
p = (
Ether(dst=src_if.local_mac, src=src_if.remote_mac)
/ IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
/ TCP(sport=src_prt, dport=dst_prt)
/ Raw(payload)
)
# store a copy of the packet in the packet info
info.data = p.copy()
# append the packet to the list
@ -2873,6 +2901,50 @@ class IPSecIPv4Fwd(VppTestCase):
self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
self.assert_equal(pkt_count, matched_pkts)
# Method verify_l3_l4_capture() will verify network and transport layer
# fields of the packet sa.encrypt() gives interface number garbadge.
# thus interface validation get failed (scapy bug?). However our intent
# is to verify IP layer and above and that is covered.
def verify_l3_l4_capture(
self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
):
for packet in capture:
try:
self.assert_packet_checksums_valid(packet)
self.assert_equal(
packet[IP].src,
src_if.remote_ip4,
"decrypted packet source address",
)
self.assert_equal(
packet[IP].dst,
dst_if.remote_ip4,
"decrypted packet destination address",
)
if packet.haslayer(TCP):
self.assertFalse(
packet.haslayer(UDP),
"unexpected UDP header in decrypted packet",
)
elif packet.haslayer(UDP):
if packet[UDP].payload:
self.assertFalse(
packet[UDP][1].haslayer(UDP),
"unexpected UDP header in decrypted packet",
)
else:
self.assertFalse(
packet.haslayer(UDP),
"unexpected UDP header in decrypted packet",
)
self.assert_equal(
packet[ICMP].id, self.icmp_id_in, "decrypted packet ICMP ID"
)
except Exception:
self.logger.error(ppp("Unexpected or invalid plain packet:", packet))
raise
class SpdFlowCacheTemplate(IPSecIPv4Fwd):
@classmethod
@ -2948,6 +3020,84 @@ class SpdFlowCacheTemplate(IPSecIPv4Fwd):
self.logger.info("\ncrc32 NOT supported:\n" + cpu_info)
return False
def create_stream(
cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP"
):
packets = []
packets = super(SpdFlowCacheTemplate, cls).create_stream(
src_if, dst_if, pkt_count, src_prt, dst_prt, proto
)
return packets
def verify_capture(
self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
):
super(SpdFlowCacheTemplate, self).verify_l3_l4_capture(
src_if, dst_if, capture, tcp_port_in, udp_port_in
)
class SpdFastPathTemplate(IPSecIPv4Fwd):
@classmethod
def setUpConstants(cls):
super(SpdFastPathTemplate, cls).setUpConstants()
# Override this method with required cmdline parameters e.g.
# cls.vpp_cmdline.extend(["ipsec", "{",
# "ipv4-outbound-spd-flow-cache on",
# "}"])
# cls.logger.info("VPP modified cmdline is %s" % " "
# .join(cls.vpp_cmdline))
def setUp(self):
super(SpdFastPathTemplate, self).setUp()
def tearDown(self):
super(SpdFastPathTemplate, self).tearDown()
def create_stream(
cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP"
):
packets = []
packets = super(SpdFastPathTemplate, cls).create_stream(
src_if, dst_if, pkt_count, src_prt, dst_prt, proto
)
return packets
def verify_capture(
self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
):
super(SpdFastPathTemplate, self).verify_l3_l4_capture(
src_if, dst_if, capture, tcp_port_in, udp_port_in
)
class IpsecDefaultTemplate(IPSecIPv4Fwd):
@classmethod
def setUpConstants(cls):
super(IpsecDefaultTemplate, cls).setUpConstants()
def setUp(self):
super(IpsecDefaultTemplate, self).setUp()
def tearDown(self):
super(IpsecDefaultTemplate, self).tearDown()
def create_stream(
cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP"
):
packets = []
packets = super(IpsecDefaultTemplate, cls).create_stream(
src_if, dst_if, pkt_count, src_prt, dst_prt, proto
)
return packets
def verify_capture(
self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
):
super(IpsecDefaultTemplate, self).verify_l3_l4_capture(
src_if, dst_if, capture, tcp_port_in, udp_port_in
)
class IPSecIPv6Fwd(VppTestCase):
"""Test IPSec by capturing and verifying IPv6 forwarded pkts"""

View File

@ -4,9 +4,9 @@ import ipaddress
from util import ppp
from framework import VppTestRunner
from template_ipsec import IPSecIPv4Fwd
from template_ipsec import IPSecIPv6Fwd
from test_ipsec_esp import TemplateIpsecEsp
from template_ipsec import SpdFastPathTemplate
def debug_signal_handler(signal, frame):
@ -20,7 +20,7 @@ import signal
signal.signal(signal.SIGINT, debug_signal_handler)
class SpdFastPathInbound(IPSecIPv4Fwd):
class SpdFastPathInbound(SpdFastPathTemplate):
# In test cases derived from this class, packets in IPv4 FWD path
# are configured to go through IPSec inbound SPD policy lookup.
# Note that order in which the rules are applied is