IPSEC: 4o6 and 6o4 for tunnel interfaces
Change-Id: I4d3ba18ab5205317219989de55b6e50d3b1d8a79 Signed-off-by: Neale Ranns <nranns@cisco.com>
This commit is contained in:
@ -21,6 +21,7 @@ class IPsecIPv4Params(object):
|
||||
|
||||
def __init__(self):
|
||||
self.remote_tun_if_host = '1.1.1.1'
|
||||
self.remote_tun_if_host6 = '1111::1'
|
||||
|
||||
self.scapy_tun_sa_id = 10
|
||||
self.scapy_tun_spi = 1001
|
||||
@ -55,6 +56,7 @@ class IPsecIPv6Params(object):
|
||||
|
||||
def __init__(self):
|
||||
self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
|
||||
self.remote_tun_if_host4 = '1.1.1.1'
|
||||
|
||||
self.scapy_tun_sa_id = 50
|
||||
self.scapy_tun_spi = 3001
|
||||
@ -213,24 +215,6 @@ class TemplateIpsec(VppTestCase):
|
||||
ICMPv6EchoRequest(id=0, seq=1, data=self.payload)
|
||||
for i in range(count)]
|
||||
|
||||
def configure_sa_tra(self, params):
|
||||
params.scapy_tra_sa = SecurityAssociation(
|
||||
self.encryption_type,
|
||||
spi=params.vpp_tra_spi,
|
||||
crypt_algo=params.crypt_algo,
|
||||
crypt_key=params.crypt_key,
|
||||
auth_algo=params.auth_algo,
|
||||
auth_key=params.auth_key,
|
||||
nat_t_header=params.nat_header)
|
||||
params.vpp_tra_sa = SecurityAssociation(
|
||||
self.encryption_type,
|
||||
spi=params.scapy_tra_spi,
|
||||
crypt_algo=params.crypt_algo,
|
||||
crypt_key=params.crypt_key,
|
||||
auth_algo=params.auth_algo,
|
||||
auth_key=params.auth_key,
|
||||
nat_t_header=params.nat_header)
|
||||
|
||||
|
||||
class IpsecTcpTests(object):
|
||||
def test_tcp_checksum(self):
|
||||
@ -466,6 +450,26 @@ class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
|
||||
|
||||
class IpsecTun4(object):
|
||||
|
||||
def verify_counters(self, p, count):
|
||||
if (hasattr(p, "spd_policy_in_any")):
|
||||
pkts = p.spd_policy_in_any.get_stats()['packets']
|
||||
self.assertEqual(pkts, count,
|
||||
"incorrect SPD any policy: expected %d != %d" %
|
||||
(count, pkts))
|
||||
|
||||
if (hasattr(p, "tun_sa_in")):
|
||||
pkts = p.tun_sa_in.get_stats()['packets']
|
||||
self.assertEqual(pkts, count,
|
||||
"incorrect SA in counts: expected %d != %d" %
|
||||
(count, pkts))
|
||||
pkts = p.tun_sa_out.get_stats()['packets']
|
||||
self.assertEqual(pkts, count,
|
||||
"incorrect SA out counts: expected %d != %d" %
|
||||
(count, pkts))
|
||||
|
||||
self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
|
||||
self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
|
||||
|
||||
def verify_tun_44(self, p, count=1):
|
||||
self.vapi.cli("clear errors")
|
||||
try:
|
||||
@ -502,24 +506,45 @@ class IpsecTun4(object):
|
||||
self.logger.info(self.vapi.ppcli("show error"))
|
||||
self.logger.info(self.vapi.ppcli("show ipsec"))
|
||||
|
||||
if (hasattr(p, "spd_policy_in_any")):
|
||||
pkts = p.spd_policy_in_any.get_stats()['packets']
|
||||
self.assertEqual(pkts, count,
|
||||
"incorrect SPD any policy: expected %d != %d" %
|
||||
(count, pkts))
|
||||
self.verify_counters(p, count)
|
||||
|
||||
if (hasattr(p, "tun_sa_in")):
|
||||
pkts = p.tun_sa_in.get_stats()['packets']
|
||||
self.assertEqual(pkts, count,
|
||||
"incorrect SA in counts: expected %d != %d" %
|
||||
(count, pkts))
|
||||
pkts = p.tun_sa_out.get_stats()['packets']
|
||||
self.assertEqual(pkts, count,
|
||||
"incorrect SA out counts: expected %d != %d" %
|
||||
(count, pkts))
|
||||
def verify_tun_64(self, p, count=1):
|
||||
self.vapi.cli("clear errors")
|
||||
try:
|
||||
config_tun_params(p, self.encryption_type, self.tun_if)
|
||||
send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
|
||||
src=p.remote_tun_if_host6,
|
||||
dst=self.pg1.remote_ip6,
|
||||
count=count)
|
||||
recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
|
||||
for recv_pkt in recv_pkts:
|
||||
self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
|
||||
self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
|
||||
self.assert_packet_checksums_valid(recv_pkt)
|
||||
send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
|
||||
dst=p.remote_tun_if_host6, count=count)
|
||||
recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
|
||||
for recv_pkt in recv_pkts:
|
||||
try:
|
||||
decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
|
||||
if not decrypt_pkt.haslayer(IPv6):
|
||||
decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
|
||||
self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
|
||||
self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
|
||||
self.assert_packet_checksums_valid(decrypt_pkt)
|
||||
except:
|
||||
self.logger.error(ppp("Unexpected packet:", recv_pkt))
|
||||
try:
|
||||
self.logger.debug(
|
||||
ppp("Decrypted packet:", decrypt_pkt))
|
||||
except:
|
||||
pass
|
||||
raise
|
||||
finally:
|
||||
self.logger.info(self.vapi.ppcli("show error"))
|
||||
self.logger.info(self.vapi.ppcli("show ipsec"))
|
||||
|
||||
self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
|
||||
self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
|
||||
self.verify_counters(p, count)
|
||||
|
||||
|
||||
class IpsecTun4Tests(IpsecTun4):
|
||||
@ -535,6 +560,19 @@ class IpsecTun4Tests(IpsecTun4):
|
||||
|
||||
class IpsecTun6(object):
|
||||
|
||||
def verify_counters(self, p, count):
|
||||
if (hasattr(p, "tun_sa_in")):
|
||||
pkts = p.tun_sa_in.get_stats()['packets']
|
||||
self.assertEqual(pkts, count,
|
||||
"incorrect SA in counts: expected %d != %d" %
|
||||
(count, pkts))
|
||||
pkts = p.tun_sa_out.get_stats()['packets']
|
||||
self.assertEqual(pkts, count,
|
||||
"incorrect SA out counts: expected %d != %d" %
|
||||
(count, pkts))
|
||||
self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
|
||||
self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
|
||||
|
||||
def verify_tun_66(self, p, count=1):
|
||||
""" ipsec 6o6 tunnel basic test """
|
||||
self.vapi.cli("clear errors")
|
||||
@ -572,18 +610,46 @@ class IpsecTun6(object):
|
||||
finally:
|
||||
self.logger.info(self.vapi.ppcli("show error"))
|
||||
self.logger.info(self.vapi.ppcli("show ipsec"))
|
||||
self.verify_counters(p, count)
|
||||
|
||||
if (hasattr(p, "tun_sa_in")):
|
||||
pkts = p.tun_sa_in.get_stats()['packets']
|
||||
self.assertEqual(pkts, count,
|
||||
"incorrect SA in counts: expected %d != %d" %
|
||||
(count, pkts))
|
||||
pkts = p.tun_sa_out.get_stats()['packets']
|
||||
self.assertEqual(pkts, count,
|
||||
"incorrect SA out counts: expected %d != %d" %
|
||||
(count, pkts))
|
||||
self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
|
||||
self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
|
||||
def verify_tun_46(self, p, count=1):
|
||||
""" ipsec 4o6 tunnel basic test """
|
||||
self.vapi.cli("clear errors")
|
||||
try:
|
||||
config_tun_params(p, self.encryption_type, self.tun_if)
|
||||
send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
|
||||
src=p.remote_tun_if_host4,
|
||||
dst=self.pg1.remote_ip4,
|
||||
count=count)
|
||||
recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
|
||||
for recv_pkt in recv_pkts:
|
||||
self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
|
||||
self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
|
||||
self.assert_packet_checksums_valid(recv_pkt)
|
||||
send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
|
||||
dst=p.remote_tun_if_host4,
|
||||
count=count)
|
||||
recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
|
||||
for recv_pkt in recv_pkts:
|
||||
try:
|
||||
decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
|
||||
if not decrypt_pkt.haslayer(IP):
|
||||
decrypt_pkt = IP(decrypt_pkt[Raw].load)
|
||||
self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
|
||||
self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
|
||||
self.assert_packet_checksums_valid(decrypt_pkt)
|
||||
except:
|
||||
self.logger.debug(ppp("Unexpected packet:", recv_pkt))
|
||||
try:
|
||||
self.logger.debug(ppp("Decrypted packet:",
|
||||
decrypt_pkt))
|
||||
except:
|
||||
pass
|
||||
raise
|
||||
finally:
|
||||
self.logger.info(self.vapi.ppcli("show error"))
|
||||
self.logger.info(self.vapi.ppcli("show ipsec"))
|
||||
self.verify_counters(p, count)
|
||||
|
||||
|
||||
class IpsecTun6Tests(IpsecTun6):
|
||||
|
Reference in New Issue
Block a user