IPSEC: Mutli-tunnel tests

Change-Id: I46f1db6579835c6613fdbb2b726246cc62b135fe
Signed-off-by: Neale Ranns <nranns@cisco.com>
This commit is contained in:
Neale Ranns
2019-03-20 18:24:43 +00:00
committed by Paul Vinciguerra
parent bcee60d570
commit 2ac885c665
5 changed files with 215 additions and 56 deletions

View File

@ -79,6 +79,45 @@ class IPsecIPv6Params(object):
self.nat_header = None self.nat_header = None
def config_tun_params(p, encryption_type, tun_if):
ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
p.scapy_tun_sa = SecurityAssociation(
encryption_type, spi=p.vpp_tun_spi,
crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
auth_algo=p.auth_algo, auth_key=p.auth_key,
tunnel_header=ip_class_by_addr_type[p.addr_type](
src=tun_if.remote_addr[p.addr_type],
dst=tun_if.local_addr[p.addr_type]),
nat_t_header=p.nat_header)
p.vpp_tun_sa = SecurityAssociation(
encryption_type, spi=p.scapy_tun_spi,
crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
auth_algo=p.auth_algo, auth_key=p.auth_key,
tunnel_header=ip_class_by_addr_type[p.addr_type](
dst=tun_if.remote_addr[p.addr_type],
src=tun_if.local_addr[p.addr_type]),
nat_t_header=p.nat_header)
def config_tra_params(p, encryption_type):
p.scapy_tra_sa = SecurityAssociation(
encryption_type,
spi=p.vpp_tra_spi,
crypt_algo=p.crypt_algo,
crypt_key=p.crypt_key,
auth_algo=p.auth_algo,
auth_key=p.auth_key,
nat_t_header=p.nat_header)
p.vpp_tra_sa = SecurityAssociation(
encryption_type,
spi=p.scapy_tra_spi,
crypt_algo=p.crypt_algo,
crypt_key=p.crypt_key,
auth_algo=p.auth_algo,
auth_key=p.auth_key,
nat_t_header=p.nat_header)
class TemplateIpsec(VppTestCase): class TemplateIpsec(VppTestCase):
""" """
TRANSPORT MODE: TRANSPORT MODE:
@ -164,26 +203,6 @@ class TemplateIpsec(VppTestCase):
ICMPv6EchoRequest(id=0, seq=1, data=self.payload) ICMPv6EchoRequest(id=0, seq=1, data=self.payload)
for i in range(count)] for i in range(count)]
def configure_sa_tun(self, params):
ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
scapy_tun_sa = SecurityAssociation(
self.encryption_type, spi=params.vpp_tun_spi,
crypt_algo=params.crypt_algo, crypt_key=params.crypt_key,
auth_algo=params.auth_algo, auth_key=params.auth_key,
tunnel_header=ip_class_by_addr_type[params.addr_type](
src=self.tun_if.remote_addr[params.addr_type],
dst=self.tun_if.local_addr[params.addr_type]),
nat_t_header=params.nat_header)
vpp_tun_sa = SecurityAssociation(
self.encryption_type, spi=params.scapy_tun_spi,
crypt_algo=params.crypt_algo, crypt_key=params.crypt_key,
auth_algo=params.auth_algo, auth_key=params.auth_key,
tunnel_header=ip_class_by_addr_type[params.addr_type](
dst=self.tun_if.remote_addr[params.addr_type],
src=self.tun_if.local_addr[params.addr_type]),
nat_t_header=params.nat_header)
return vpp_tun_sa, scapy_tun_sa
def configure_sa_tra(self, params): def configure_sa_tra(self, params):
params.scapy_tra_sa = SecurityAssociation( params.scapy_tra_sa = SecurityAssociation(
self.encryption_type, self.encryption_type,
@ -208,15 +227,15 @@ class IpsecTcpTests(object):
""" verify checksum correctness for vpp generated packets """ """ verify checksum correctness for vpp generated packets """
self.vapi.cli("test http server") self.vapi.cli("test http server")
p = self.params[socket.AF_INET] p = self.params[socket.AF_INET]
vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p) config_tun_params(p, self.encryption_type, self.tun_if)
send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) / send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host, p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
dst=self.tun_if.local_ip4) / dst=self.tun_if.local_ip4) /
TCP(flags='S', dport=80))) TCP(flags='S', dport=80)))
self.logger.debug(ppp("Sending packet:", send)) self.logger.debug(ppp("Sending packet:", send))
recv = self.send_and_expect(self.tun_if, [send], self.tun_if) recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
recv = recv[0] recv = recv[0]
decrypted = vpp_tun_sa.decrypt(recv[IP]) decrypted = p.vpp_tun_sa.decrypt(recv[IP])
self.assert_packet_checksums_valid(decrypted) self.assert_packet_checksums_valid(decrypted)
@ -374,14 +393,13 @@ class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
pass pass
class IpsecTun4Tests(object): class IpsecTun4(object):
def test_tun_basic44(self, count=1):
""" ipsec 4o4 tunnel basic test """ def verify_tun_44(self, p, count=1):
self.vapi.cli("clear errors") self.vapi.cli("clear errors")
try: try:
p = self.params[socket.AF_INET] config_tun_params(p, self.encryption_type, self.tun_if)
vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p) send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
send_pkts = self.gen_encrypt_pkts(scapy_tun_sa, self.tun_if,
src=p.remote_tun_if_host, src=p.remote_tun_if_host,
dst=self.pg1.remote_ip4, dst=self.pg1.remote_ip4,
count=count) count=count)
@ -395,7 +413,7 @@ class IpsecTun4Tests(object):
recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if) recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
for recv_pkt in recv_pkts: for recv_pkt in recv_pkts:
try: try:
decrypt_pkt = vpp_tun_sa.decrypt(recv_pkt[IP]) decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
if not decrypt_pkt.haslayer(IP): if not decrypt_pkt.haslayer(IP):
decrypt_pkt = IP(decrypt_pkt[Raw].load) decrypt_pkt = IP(decrypt_pkt[Raw].load)
self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4) self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
@ -432,19 +450,26 @@ class IpsecTun4Tests(object):
self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count) self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count) self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
class IpsecTun4Tests(IpsecTun4):
def test_tun_basic44(self):
""" ipsec 4o4 tunnel basic test """
self.verify_tun_44(self.params[socket.AF_INET], count=1)
def test_tun_burst44(self): def test_tun_burst44(self):
""" ipsec 4o4 tunnel burst test """ """ ipsec 4o4 tunnel burst test """
self.test_tun_basic44(count=257) self.verify_tun_44(self.params[socket.AF_INET], count=257)
class IpsecTun6Tests(object): class IpsecTun6(object):
def test_tun_basic66(self, count=1):
def verify_tun_66(self, p, count=1):
""" ipsec 6o6 tunnel basic test """ """ ipsec 6o6 tunnel basic test """
self.vapi.cli("clear errors") self.vapi.cli("clear errors")
try: try:
p = self.params[socket.AF_INET6] config_tun_params(p, self.encryption_type, self.tun_if)
vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p) send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
send_pkts = self.gen_encrypt_pkts6(scapy_tun_sa, self.tun_if,
src=p.remote_tun_if_host, src=p.remote_tun_if_host,
dst=self.pg1.remote_ip6, dst=self.pg1.remote_ip6,
count=count) count=count)
@ -459,7 +484,7 @@ class IpsecTun6Tests(object):
recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if) recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
for recv_pkt in recv_pkts: for recv_pkt in recv_pkts:
try: try:
decrypt_pkt = vpp_tun_sa.decrypt(recv_pkt[IPv6]) decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
if not decrypt_pkt.haslayer(IPv6): if not decrypt_pkt.haslayer(IPv6):
decrypt_pkt = IPv6(decrypt_pkt[Raw].load) decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6) self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
@ -489,9 +514,16 @@ class IpsecTun6Tests(object):
self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count) self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count) self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
class IpsecTun6Tests(IpsecTun6):
def test_tun_basic66(self):
""" ipsec 6o6 tunnel basic test """
self.verify_tun_66(self.params[socket.AF_INET6], count=1)
def test_tun_burst66(self): def test_tun_burst66(self):
""" ipsec 6o6 tunnel burst test """ """ ipsec 6o6 tunnel burst test """
self.test_tun_basic66(count=257) self.verify_tun_66(self.params[socket.AF_INET6], count=257)
class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests): class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):

View File

@ -4,7 +4,8 @@ import unittest
from scapy.layers.ipsec import AH from scapy.layers.ipsec import AH
from framework import VppTestRunner from framework import VppTestRunner
from template_ipsec import TemplateIpsec, IpsecTra46Tests, IpsecTun46Tests from template_ipsec import TemplateIpsec, IpsecTra46Tests, IpsecTun46Tests, \
config_tun_params, config_tra_params
from template_ipsec import IpsecTcpTests from template_ipsec import IpsecTcpTests
from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\ from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\
VppIpsecSpdItfBinding VppIpsecSpdItfBinding
@ -53,7 +54,7 @@ class TemplateIpsecAh(TemplateIpsec):
for _, p in self.params.items(): for _, p in self.params.items():
self.config_ah_tra(p) self.config_ah_tra(p)
self.configure_sa_tra(p) config_tra_params(p, self.encryption_type)
self.logger.info(self.vapi.ppcli("show ipsec")) self.logger.info(self.vapi.ppcli("show ipsec"))
for _, p in self.params.items(): for _, p in self.params.items():
self.config_ah_tun(p) self.config_ah_tun(p)

View File

@ -5,7 +5,7 @@ from scapy.layers.inet import UDP
from framework import VppTestRunner from framework import VppTestRunner
from template_ipsec import IpsecTra46Tests, IpsecTun46Tests, TemplateIpsec, \ from template_ipsec import IpsecTra46Tests, IpsecTun46Tests, TemplateIpsec, \
IpsecTcpTests, IpsecTun4Tests, IpsecTra4Tests IpsecTcpTests, IpsecTun4Tests, IpsecTra4Tests, config_tra_params
from vpp_ipsec import VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSA,\ from vpp_ipsec import VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSA,\
VppIpsecSpdItfBinding VppIpsecSpdItfBinding
from vpp_ip_route import VppIpRoute, VppRoutePath from vpp_ip_route import VppIpRoute, VppRoutePath
@ -177,8 +177,6 @@ class TemplateIpsecEsp(TemplateIpsec):
|pg0| -------> |VPP| ------> |pg1| |pg0| -------> |VPP| ------> |pg1|
--- --- --- --- --- ---
""" """
config_esp_tun = config_esp_tun
config_esp_tra = config_esp_tra
def setUp(self): def setUp(self):
super(TemplateIpsecEsp, self).setUp() super(TemplateIpsecEsp, self).setUp()
@ -193,8 +191,8 @@ class TemplateIpsecEsp(TemplateIpsec):
self.tra_if).add_vpp_config() self.tra_if).add_vpp_config()
for _, p in self.params.items(): for _, p in self.params.items():
self.config_esp_tra(p) config_esp_tra(self, p)
self.configure_sa_tra(p) config_tra_params(p, self.encryption_type)
self.logger.info(self.vapi.ppcli("show ipsec")) self.logger.info(self.vapi.ppcli("show ipsec"))
self.tun_spd = VppIpsecSpd(self, self.tun_spd_id) self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
@ -203,7 +201,7 @@ class TemplateIpsecEsp(TemplateIpsec):
self.tun_if).add_vpp_config() self.tun_if).add_vpp_config()
for _, p in self.params.items(): for _, p in self.params.items():
self.config_esp_tun(p) config_esp_tun(self, p)
self.logger.info(self.vapi.ppcli("show ipsec")) self.logger.info(self.vapi.ppcli("show ipsec"))
for _, p in self.params.items(): for _, p in self.params.items():
@ -241,9 +239,6 @@ class TemplateIpsecEspUdp(TemplateIpsec):
""" """
UDP encapped ESP UDP encapped ESP
""" """
config_esp_tun = config_esp_tun
config_esp_tra = config_esp_tra
def setUp(self): def setUp(self):
super(TemplateIpsecEspUdp, self).setUp() super(TemplateIpsecEspUdp, self).setUp()
self.encryption_type = ESP self.encryption_type = ESP
@ -261,15 +256,15 @@ class TemplateIpsecEspUdp(TemplateIpsec):
VppIpsecSpdItfBinding(self, self.tra_spd, VppIpsecSpdItfBinding(self, self.tra_spd,
self.tra_if).add_vpp_config() self.tra_if).add_vpp_config()
self.config_esp_tra(p) config_esp_tra(self, p)
self.configure_sa_tra(p) config_tra_params(p, self.encryption_type)
self.tun_spd = VppIpsecSpd(self, self.tun_spd_id) self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
self.tun_spd.add_vpp_config() self.tun_spd.add_vpp_config()
VppIpsecSpdItfBinding(self, self.tun_spd, VppIpsecSpdItfBinding(self, self.tun_spd,
self.tun_if).add_vpp_config() self.tun_if).add_vpp_config()
self.config_esp_tun(p) config_esp_tun(self, p)
self.logger.info(self.vapi.ppcli("show ipsec")) self.logger.info(self.vapi.ppcli("show ipsec"))
d = DpoProto.DPO_PROTO_IP4 d = DpoProto.DPO_PROTO_IP4

View File

@ -1,9 +1,10 @@
import unittest import unittest
import socket import socket
import copy
from scapy.layers.ipsec import ESP from scapy.layers.ipsec import ESP
from framework import VppTestRunner from framework import VppTestRunner
from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \ from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
IpsecTcpTests IpsecTun4, IpsecTun6, IpsecTcpTests, config_tun_params
from vpp_ipsec_tun_interface import VppIpsecTunInterface from vpp_ipsec_tun_interface import VppIpsecTunInterface
from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
@ -69,7 +70,7 @@ class TemplateIpsec6TunIfEsp(TemplateIpsec):
tun_if.admin_up() tun_if.admin_up()
tun_if.config_ip6() tun_if.config_ip6()
VppIpRoute(self, p.remote_tun_if_host, 32, VppIpRoute(self, p.remote_tun_if_host, 128,
[VppRoutePath(tun_if.remote_ip6, [VppRoutePath(tun_if.remote_ip6,
0xffffffff, 0xffffffff,
proto=DpoProto.DPO_PROTO_IP6)], proto=DpoProto.DPO_PROTO_IP6)],
@ -87,5 +88,127 @@ class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
tun6_decrypt_node_name = "esp6-decrypt" tun6_decrypt_node_name = "esp6-decrypt"
class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
""" IPsec IPv4 Multi Tunnel interface """
encryption_type = ESP
tun4_encrypt_node_name = "esp4-encrypt"
tun4_decrypt_node_name = "esp4-decrypt"
def setUp(self):
super(TestIpsec4MultiTunIfEsp, self).setUp()
self.tun_if = self.pg0
self.multi_params = []
for ii in range(10):
p = copy.copy(self.ipv4_params)
p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
p.scapy_tun_spi = p.scapy_tun_spi + ii
p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
p.vpp_tun_spi = p.vpp_tun_spi + ii
p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
p.scapy_tra_spi = p.scapy_tra_spi + ii
p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
p.vpp_tra_spi = p.vpp_tra_spi + ii
config_tun_params(p, self.encryption_type, self.tun_if)
self.multi_params.append(p)
p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
p.scapy_tun_spi,
p.crypt_algo_vpp_id,
p.crypt_key, p.crypt_key,
p.auth_algo_vpp_id, p.auth_key,
p.auth_key)
p.tun_if.add_vpp_config()
p.tun_if.admin_up()
p.tun_if.config_ip4()
VppIpRoute(self, p.remote_tun_if_host, 32,
[VppRoutePath(p.tun_if.remote_ip4,
0xffffffff)]).add_vpp_config()
def tearDown(self):
if not self.vpp_dead:
self.vapi.cli("show hardware")
super(TestIpsec4MultiTunIfEsp, self).tearDown()
def test_tun_44(self):
"""Multiple IPSEC tunnel interfaces """
for p in self.multi_params:
self.verify_tun_44(p, count=127)
c = p.tun_if.get_rx_stats()
self.assertEqual(c['packets'], 127)
c = p.tun_if.get_tx_stats()
self.assertEqual(c['packets'], 127)
class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
""" IPsec IPv6 Muitli Tunnel interface """
encryption_type = ESP
tun6_encrypt_node_name = "esp6-encrypt"
tun6_decrypt_node_name = "esp6-decrypt"
def setUp(self):
super(TestIpsec6MultiTunIfEsp, self).setUp()
self.tun_if = self.pg0
self.multi_params = []
for ii in range(10):
p = copy.copy(self.ipv6_params)
p.remote_tun_if_host = "1111::%d" % (ii + 1)
p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
p.scapy_tun_spi = p.scapy_tun_spi + ii
p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
p.vpp_tun_spi = p.vpp_tun_spi + ii
p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
p.scapy_tra_spi = p.scapy_tra_spi + ii
p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
p.vpp_tra_spi = p.vpp_tra_spi + ii
config_tun_params(p, self.encryption_type, self.tun_if)
self.multi_params.append(p)
p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
p.scapy_tun_spi,
p.crypt_algo_vpp_id,
p.crypt_key, p.crypt_key,
p.auth_algo_vpp_id, p.auth_key,
p.auth_key, is_ip6=True)
p.tun_if.add_vpp_config()
p.tun_if.admin_up()
p.tun_if.config_ip6()
VppIpRoute(self, p.remote_tun_if_host, 128,
[VppRoutePath(p.tun_if.remote_ip6,
0xffffffff,
proto=DpoProto.DPO_PROTO_IP6)],
is_ip6=1).add_vpp_config()
def tearDown(self):
if not self.vpp_dead:
self.vapi.cli("show hardware")
super(TestIpsec6MultiTunIfEsp, self).tearDown()
def test_tun_66(self):
"""Multiple IPSEC tunnel interfaces """
for p in self.multi_params:
self.verify_tun_66(p, count=127)
c = p.tun_if.get_rx_stats()
self.assertEqual(c['packets'], 127)
c = p.tun_if.get_tx_stats()
self.assertEqual(c['packets'], 127)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner) unittest.main(testRunner=VppTestRunner)

View File

@ -463,3 +463,11 @@ class VppInterface(object):
def __str__(self): def __str__(self):
return self.name return self.name
def get_rx_stats(self):
c = self.test.statistics.get_counter("^/if/rx$")
return c[0][self.sw_if_index]
def get_tx_stats(self):
c = self.test.statistics.get_counter("^/if/tx$")
return c[0][self.sw_if_index]