ipsec: ipsec-tun protect
please consult the new tunnel proposal at: https://wiki.fd.io/view/VPP/IPSec Type: feature Change-Id: I52857fc92ae068b85f59be08bdbea1bd5932e291 Signed-off-by: Neale Ranns <nranns@cisco.com>
This commit is contained in:

committed by
Damjan Marion

parent
097fa66b98
commit
c87b66c862
@ -656,59 +656,85 @@ class IpsecTun4Tests(IpsecTun4):
|
||||
|
||||
class IpsecTun6(object):
|
||||
""" verify methods for Tunnel v6 """
|
||||
def verify_counters6(self, p, count):
|
||||
if (hasattr(p, "tun_sa_in")):
|
||||
pkts = p.tun_sa_in.get_stats()['packets']
|
||||
def verify_counters6(self, p_in, p_out, count):
|
||||
if (hasattr(p_in, "tun_sa_in")):
|
||||
pkts = p_in.tun_sa_in.get_stats()['packets']
|
||||
self.assertEqual(pkts, count,
|
||||
"incorrect SA in counts: expected %d != %d" %
|
||||
(count, pkts))
|
||||
pkts = p.tun_sa_out.get_stats()['packets']
|
||||
if (hasattr(p_out, "tun_sa_out")):
|
||||
pkts = p_out.tun_sa_out.get_stats()['packets']
|
||||
self.assertEqual(pkts, count,
|
||||
"incorrect SA out counts: expected %d != %d" %
|
||||
(count, pkts))
|
||||
self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
|
||||
self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
|
||||
|
||||
def verify_tun_66(self, p, count=1):
|
||||
""" ipsec 6o6 tunnel basic test """
|
||||
def verify_decrypted6(self, p, rxs):
|
||||
for rx in rxs:
|
||||
self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
|
||||
self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
|
||||
self.assert_packet_checksums_valid(rx)
|
||||
|
||||
def verify_encrypted6(self, p, sa, rxs):
|
||||
for rx in rxs:
|
||||
self.assert_packet_checksums_valid(rx)
|
||||
self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
|
||||
rx[IPv6].plen)
|
||||
try:
|
||||
decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
|
||||
if not decrypt_pkt.haslayer(IPv6):
|
||||
decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
|
||||
self.assert_packet_checksums_valid(decrypt_pkt)
|
||||
self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
|
||||
self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
|
||||
except:
|
||||
self.logger.debug(ppp("Unexpected packet:", rx))
|
||||
try:
|
||||
self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
|
||||
except:
|
||||
pass
|
||||
raise
|
||||
|
||||
def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
|
||||
self.vapi.cli("clear errors")
|
||||
self.vapi.cli("clear ipsec sa")
|
||||
|
||||
config_tun_params(p_in, self.encryption_type, self.tun_if)
|
||||
send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
|
||||
src=p_in.remote_tun_if_host,
|
||||
dst=self.pg1.remote_ip6,
|
||||
count=count)
|
||||
self.send_and_assert_no_replies(self.tun_if, send_pkts)
|
||||
self.logger.info(self.vapi.cli("sh punt stats"))
|
||||
|
||||
def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
|
||||
self.vapi.cli("clear errors")
|
||||
self.vapi.cli("clear ipsec sa")
|
||||
if not p_out:
|
||||
p_out = p_in
|
||||
try:
|
||||
config_tun_params(p, self.encryption_type, self.tun_if)
|
||||
send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
|
||||
src=p.remote_tun_if_host,
|
||||
config_tun_params(p_in, self.encryption_type, self.tun_if)
|
||||
config_tun_params(p_out, self.encryption_type, self.tun_if)
|
||||
send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
|
||||
src=p_in.remote_tun_if_host,
|
||||
dst=self.pg1.remote_ip6,
|
||||
count=count)
|
||||
recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
|
||||
for recv_pkt in recv_pkts:
|
||||
self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host)
|
||||
self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
|
||||
self.assert_packet_checksums_valid(recv_pkt)
|
||||
self.verify_decrypted6(p_in, recv_pkts)
|
||||
|
||||
send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
|
||||
dst=p.remote_tun_if_host,
|
||||
count=count)
|
||||
recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
|
||||
for recv_pkt in recv_pkts:
|
||||
self.assertEqual(len(recv_pkt) - len(Ether()) - len(IPv6()),
|
||||
recv_pkt[IPv6].plen)
|
||||
try:
|
||||
decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
|
||||
if not decrypt_pkt.haslayer(IPv6):
|
||||
decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
|
||||
self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
|
||||
self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
|
||||
self.assert_packet_checksums_valid(decrypt_pkt)
|
||||
except:
|
||||
self.logger.debug(ppp("Unexpected packet:", recv_pkt))
|
||||
try:
|
||||
self.logger.debug(
|
||||
ppp("Decrypted packet:", decrypt_pkt))
|
||||
except:
|
||||
pass
|
||||
raise
|
||||
dst=p_out.remote_tun_if_host,
|
||||
count=count,
|
||||
payload_size=payload_size)
|
||||
recv_pkts = self.send_and_expect(self.pg1, send_pkts,
|
||||
self.tun_if)
|
||||
self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
|
||||
|
||||
finally:
|
||||
self.logger.info(self.vapi.ppcli("show error"))
|
||||
self.logger.info(self.vapi.ppcli("show ipsec all"))
|
||||
self.verify_counters6(p, count)
|
||||
self.verify_counters6(p_in, p_out, count)
|
||||
|
||||
def verify_tun_46(self, p, count=1):
|
||||
""" ipsec 4o6 tunnel basic test """
|
||||
@ -747,7 +773,7 @@ class IpsecTun6(object):
|
||||
finally:
|
||||
self.logger.info(self.vapi.ppcli("show error"))
|
||||
self.logger.info(self.vapi.ppcli("show ipsec all"))
|
||||
self.verify_counters6(p, count)
|
||||
self.verify_counters6(p, p, count)
|
||||
|
||||
|
||||
class IpsecTun6Tests(IpsecTun6):
|
||||
|
Reference in New Issue
Block a user