ipsec: Targeted unit testing

Type: fix

1 - big packets; chained buffers and those without enoguh space to add
ESP header
2 - IPv6 extension headers in packets that are encrypted/decrypted
3 - Interface protection with SAs that have null algorithms

Signed-off-by: Neale Ranns <nranns@cisco.com>
Change-Id: Ie330861fb06a9b248d9dcd5c730e21326ac8e973
This commit is contained in:
Neale Ranns
2019-12-20 00:54:57 +00:00
parent 2f04cb9f14
commit 02950406c4
9 changed files with 595 additions and 107 deletions

View File

@ -6,7 +6,9 @@ from scapy.layers.inet import IP, ICMP, TCP, UDP
from scapy.layers.ipsec import SecurityAssociation, ESP
from scapy.layers.l2 import Ether
from scapy.packet import Raw
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, IPv6ExtHdrHopByHop, \
IPv6ExtHdrFragment, IPv6ExtHdrDestOpt
from framework import VppTestCase, VppTestRunner
from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200
@ -641,6 +643,108 @@ class IpsecTra6(object):
self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
def gen_encrypt_pkts_ext_hdrs6(self, sa, sw_intf, src, dst, count=1,
payload_size=54):
return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
sa.encrypt(IPv6(src=src, dst=dst) /
ICMPv6EchoRequest(id=0, seq=1,
data='X' * payload_size))
for i in range(count)]
def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54):
return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
IPv6(src=src, dst=dst) /
IPv6ExtHdrHopByHop() /
IPv6ExtHdrFragment(id=2, offset=200) /
Raw(b'\xff' * 200)
for i in range(count)]
def verify_tra_encrypted6(self, p, sa, rxs):
decrypted = []
for rx in rxs:
self.assert_packet_checksums_valid(rx)
try:
decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6])
decrypted.append(decrypt_pkt)
self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6)
self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6)
except:
self.logger.debug(ppp("Unexpected packet:", rx))
try:
self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
except:
pass
raise
return decrypted
def verify_tra_66_ext_hdrs(self, p):
count = 63
#
# check we can decrypt with options
#
tx = self.gen_encrypt_pkts_ext_hdrs6(p.scapy_tra_sa, self.tra_if,
src=self.tra_if.remote_ip6,
dst=self.tra_if.local_ip6,
count=count)
self.send_and_expect(self.tra_if, tx, self.tra_if)
#
# injecting a packet from ourselves to be routed of box is a hack
# but it matches an outbout policy, alors je ne regrette rien
#
# one extension before ESP
tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
IPv6(src=self.tra_if.local_ip6,
dst=self.tra_if.remote_ip6) /
IPv6ExtHdrFragment(id=2, offset=200) /
Raw(b'\xff' * 200))
rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
for dc in dcs:
# for reasons i'm not going to investigate scapy does not
# created the correct headers after decrypt. but reparsing
# the ipv6 packet fixes it
dc = IPv6(raw(dc[IPv6]))
self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
# two extensions before ESP
tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
IPv6(src=self.tra_if.local_ip6,
dst=self.tra_if.remote_ip6) /
IPv6ExtHdrHopByHop() /
IPv6ExtHdrFragment(id=2, offset=200) /
Raw(b'\xff' * 200))
rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
for dc in dcs:
dc = IPv6(raw(dc[IPv6]))
self.assertTrue(dc[IPv6ExtHdrHopByHop])
self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
# two extensions before ESP, one after
tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) /
IPv6(src=self.tra_if.local_ip6,
dst=self.tra_if.remote_ip6) /
IPv6ExtHdrHopByHop() /
IPv6ExtHdrFragment(id=2, offset=200) /
IPv6ExtHdrDestOpt() /
Raw(b'\xff' * 200))
rxs = self.send_and_expect(self.pg2, [tx], self.tra_if)
dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs)
for dc in dcs:
dc = IPv6(raw(dc[IPv6]))
self.assertTrue(dc[IPv6ExtHdrDestOpt])
self.assertTrue(dc[IPv6ExtHdrHopByHop])
self.assert_equal(dc[IPv6ExtHdrFragment].id, 2)
class IpsecTra6Tests(IpsecTra6):
""" UT test methods for Transport v6 """
@ -653,6 +757,12 @@ class IpsecTra6Tests(IpsecTra6):
self.verify_tra_basic6(count=257)
class IpsecTra6ExtTests(IpsecTra6):
def test_tra_ext_hdrs_66(self):
""" ipsec 6o6 tra extension headers test """
self.verify_tra_66_ext_hdrs(self.params[socket.AF_INET6])
class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
""" UT test methods for Transport v6 and v4"""
pass
@ -715,6 +825,7 @@ class IpsecTun4(object):
def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
self.vapi.cli("clear errors")
self.vapi.cli("clear ipsec counters")
if not n_rx:
n_rx = count
try:
@ -736,8 +847,45 @@ class IpsecTun4(object):
self.logger.info(self.vapi.ppcli("show error"))
self.logger.info(self.vapi.ppcli("show ipsec all"))
self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
self.logger.info(self.vapi.ppcli("show ipsec sa 4"))
self.verify_counters4(p, count, n_rx)
""" verify methods for Transport v4 """
def verify_tun_44_bad_packet_sizes(self, p):
# with a buffer size of 2048, 1989 bytes of payload
# means there isn't space to insert the ESP header
N_PKTS = 63
for p_siz in [1989, 8500]:
send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
src=p.remote_tun_if_host,
dst=self.pg1.remote_ip4,
count=N_PKTS,
payload_size=p_siz)
self.send_and_assert_no_replies(self.tun_if, send_pkts)
send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
dst=p.remote_tun_if_host, count=N_PKTS,
payload_size=p_siz)
self.send_and_assert_no_replies(self.pg1, send_pkts,
self.tun_if)
# both large packets on decrpyt count against chained buffers
# the 9000 bytes one does on encrypt
self.assertEqual(2 * N_PKTS,
self.statistics.get_err_counter(
'/err/%s/chained buffers (packet dropped)' %
self.tun4_decrypt_node_name))
self.assertEqual(N_PKTS,
self.statistics.get_err_counter(
'/err/%s/chained buffers (packet dropped)' %
self.tun4_encrypt_node_name))
# on encrypt the 1989 size is no trailer space
self.assertEqual(N_PKTS,
self.statistics.get_err_counter(
'/err/%s/no trailer space (packet dropped)' %
self.tun4_encrypt_node_name))
def verify_tun_reass_44(self, p):
self.vapi.cli("clear errors")
self.vapi.ip_reassembly_enable_disable(
@ -828,6 +976,10 @@ class IpsecTun4Tests(IpsecTun4):
def test_tun_basic44(self):
""" ipsec 4o4 tunnel basic test """
self.verify_tun_44(self.params[socket.AF_INET], count=1)
self.tun_if.admin_down()
self.tun_if.resolve_arp()
self.tun_if.admin_up()
self.verify_tun_44(self.params[socket.AF_INET], count=1)
def test_tun_reass_basic44(self):
""" ipsec 4o4 tunnel basic reassembly test """
@ -835,7 +987,13 @@ class IpsecTun4Tests(IpsecTun4):
def test_tun_burst44(self):
""" ipsec 4o4 tunnel burst test """
self.verify_tun_44(self.params[socket.AF_INET], count=257)
self.verify_tun_44(self.params[socket.AF_INET], count=127)
class IpsecTunEsp4Tests(IpsecTun4):
def test_tun_bad_packet_sizes(self):
""" ipsec v4 tunnel bad packet size """
self.verify_tun_44_bad_packet_sizes(self.params[socket.AF_INET])
class IpsecTun6(object):
@ -926,7 +1084,7 @@ class IpsecTun6(object):
src=p.remote_tun_if_host,
dst=self.pg1.remote_ip6,
count=1,
payload_size=1900)
payload_size=1850)
send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger)
recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
self.pg1, n_rx=1)