IPSEC Tests: to per-test setup and tearDown

don't do the setup and teardown in class methods so that with
each test the config is added and deleted. that way we test that
delete actually removes state.
more helpful error codes from VPP for existing IPSEC state.

Change-Id: I5de1578f73b935b420d4cdd85aa98d5fdcc682f6
Signed-off-by: Neale Ranns <nranns@cisco.com>
This commit is contained in:
Neale Ranns
2019-01-23 08:16:17 -08:00
committed by Damjan Marion
parent e18b45caeb
commit 8e4a89bf42
7 changed files with 596 additions and 307 deletions

View File

@ -99,9 +99,9 @@ ipsec_add_del_spd (vlib_main_t * vm, u32 spd_id, int is_add)
p = hash_get (im->spd_index_by_spd_id, spd_id);
if (p && is_add)
return VNET_API_ERROR_INVALID_VALUE;
return VNET_API_ERROR_ENTRY_ALREADY_EXISTS;
if (!p && !is_add)
return VNET_API_ERROR_INVALID_VALUE;
return VNET_API_ERROR_NO_SUCH_ENTRY;
if (!is_add) /* delete */
{
@ -441,9 +441,9 @@ ipsec_add_del_sa (vlib_main_t * vm, ipsec_sa_t * new_sa, int is_add)
p = hash_get (im->sa_index_by_sa_id, new_sa->id);
if (p && is_add)
return VNET_API_ERROR_SYSCALL_ERROR_1; /* already exists */
return VNET_API_ERROR_ENTRY_ALREADY_EXISTS;
if (!p && !is_add)
return VNET_API_ERROR_SYSCALL_ERROR_1;
return VNET_API_ERROR_NO_SUCH_ENTRY;
if (!is_add) /* delete */
{

View File

@ -82,39 +82,46 @@ class TemplateIpsec(VppTestCase):
|tun_if| -------> |VPP| ------> |pg1|
------ --- ---
"""
ipv4_params = IPsecIPv4Params()
ipv6_params = IPsecIPv6Params()
params = {ipv4_params.addr_type: ipv4_params,
ipv6_params.addr_type: ipv6_params}
payload = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
tun_spd_id = 1
tra_spd_id = 2
vpp_esp_protocol = 1
vpp_ah_protocol = 0
@classmethod
def ipsec_select_backend(cls):
def ipsec_select_backend(self):
""" empty method to be overloaded when necessary """
pass
@classmethod
def setUpClass(cls):
super(TemplateIpsec, cls).setUpClass()
cls.create_pg_interfaces(range(3))
cls.interfaces = list(cls.pg_interfaces)
for i in cls.interfaces:
def setUp(self):
super(TemplateIpsec, self).setUp()
self.ipv4_params = IPsecIPv4Params()
self.ipv6_params = IPsecIPv6Params()
self.params = {self.ipv4_params.addr_type: self.ipv4_params,
self.ipv6_params.addr_type: self.ipv6_params}
self.payload = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"\
"XXXXXXXXXXXXXXXXXXXXX"
self.tun_spd_id = 1
self.tra_spd_id = 2
self.vpp_esp_protocol = 1
self.vpp_ah_protocol = 0
self.create_pg_interfaces(range(3))
self.interfaces = list(self.pg_interfaces)
for i in self.interfaces:
i.admin_up()
i.config_ip4()
i.resolve_arp()
i.config_ip6()
i.resolve_ndp()
cls.ipsec_select_backend()
self.ipsec_select_backend()
def tearDown(self):
super(TemplateIpsec, self).tearDown()
for i in self.interfaces:
i.admin_down()
i.unconfig_ip4()
i.unconfig_ip6()
if not self.vpp_dead:
self.vapi.cli("show hardware")
@ -158,15 +165,14 @@ class TemplateIpsec(VppTestCase):
src=self.tun_if.local_addr[params.addr_type]))
return vpp_tun_sa, scapy_tun_sa
@classmethod
def configure_sa_tra(cls, params):
params.scapy_tra_sa = SecurityAssociation(cls.encryption_type,
def configure_sa_tra(self, params):
params.scapy_tra_sa = SecurityAssociation(self.encryption_type,
spi=params.vpp_tra_spi,
crypt_algo=params.crypt_algo,
crypt_key=params.crypt_key,
auth_algo=params.auth_algo,
auth_key=params.auth_key)
params.vpp_tra_sa = SecurityAssociation(cls.encryption_type,
params.vpp_tra_sa = SecurityAssociation(self.encryption_type,
spi=params.scapy_tra_spi,
crypt_algo=params.crypt_algo,
crypt_key=params.crypt_key,

File diff suppressed because it is too large Load Diff

View File

@ -1,47 +1,55 @@
import unittest
from framework import VppTestCase, VppTestRunner
from template_ipsec import TemplateIpsec
from template_ipsec import TemplateIpsec, IPsecIPv4Params
class IpsecApiTestCase(VppTestCase):
""" IPSec API tests """
@classmethod
def setUpClass(cls):
super(IpsecApiTestCase, cls).setUpClass()
cls.create_pg_interfaces([0])
cls.pg0.config_ip4()
cls.pg0.admin_up()
def setUp(self):
super(IpsecApiTestCase, self).setUp()
self.create_pg_interfaces([0])
self.pg0.config_ip4()
self.pg0.admin_up()
self.vpp_esp_protocol = 1
self.vpp_ah_protocol = 0
self.ipv4_params = IPsecIPv4Params()
def tearDown(self):
self.pg0.unconfig_ip4()
self.pg0.admin_down()
super(IpsecApiTestCase, self).tearDown()
def test_backend_dump(self):
""" backend dump """
d = self.vapi.ipsec_backend_dump()
self.assert_equal(len(d), 2, "number of ipsec backends in dump")
self.assert_equal(d[0].protocol, TemplateIpsec.vpp_ah_protocol,
self.assert_equal(d[0].protocol, self.vpp_ah_protocol,
"ipsec protocol in dump entry")
self.assert_equal(d[0].index, 0, "index in dump entry")
self.assert_equal(d[0].active, 1, "active flag in dump entry")
self.assert_equal(d[1].protocol, TemplateIpsec.vpp_esp_protocol,
self.assert_equal(d[1].protocol, self.vpp_esp_protocol,
"ipsec protocol in dump entry")
self.assert_equal(d[1].index, 0, "index in dump entry")
self.assert_equal(d[1].active, 1, "active flag in dump entry")
def test_select_valid_backend(self):
""" select valid backend """
self.vapi.ipsec_select_backend(TemplateIpsec.vpp_ah_protocol, 0)
self.vapi.ipsec_select_backend(TemplateIpsec.vpp_esp_protocol, 0)
self.vapi.ipsec_select_backend(self.vpp_ah_protocol, 0)
self.vapi.ipsec_select_backend(self.vpp_esp_protocol, 0)
def test_select_invalid_backend(self):
""" select invalid backend """
with self.vapi.assert_negative_api_retval():
self.vapi.ipsec_select_backend(TemplateIpsec.vpp_ah_protocol, 200)
self.vapi.ipsec_select_backend(self.vpp_ah_protocol, 200)
with self.vapi.assert_negative_api_retval():
self.vapi.ipsec_select_backend(TemplateIpsec.vpp_esp_protocol, 200)
self.vapi.ipsec_select_backend(self.vpp_esp_protocol, 200)
def test_select_backend_in_use(self):
""" attempt to change backend while sad configured """
params = TemplateIpsec.ipv4_params
params = self.ipv4_params
addr_type = params.addr_type
is_ipv6 = params.is_ipv6
scapy_tun_sa_id = params.scapy_tun_sa_id
@ -54,24 +62,24 @@ class IpsecApiTestCase(VppTestCase):
self.vapi.ipsec_sad_add_del_entry(scapy_tun_sa_id, scapy_tun_spi,
auth_algo_vpp_id, auth_key,
crypt_algo_vpp_id, crypt_key,
TemplateIpsec.vpp_ah_protocol,
self.vpp_ah_protocol,
self.pg0.local_addr_n[addr_type],
self.pg0.remote_addr_n[addr_type],
is_tunnel=1, is_tunnel_ipv6=is_ipv6)
with self.vapi.assert_negative_api_retval():
self.vapi.ipsec_select_backend(
protocol=TemplateIpsec.vpp_ah_protocol, index=0)
protocol=self.vpp_ah_protocol, index=0)
self.vapi.ipsec_sad_add_del_entry(scapy_tun_sa_id, scapy_tun_spi,
auth_algo_vpp_id, auth_key,
crypt_algo_vpp_id, crypt_key,
TemplateIpsec.vpp_ah_protocol,
self.vpp_ah_protocol,
self.pg0.local_addr_n[addr_type],
self.pg0.remote_addr_n[addr_type],
is_tunnel=1, is_tunnel_ipv6=is_ipv6,
is_add=0)
self.vapi.ipsec_select_backend(
protocol=TemplateIpsec.vpp_ah_protocol, index=0)
protocol=self.vpp_ah_protocol, index=0)
if __name__ == '__main__':

File diff suppressed because it is too large Load Diff

View File

@ -31,20 +31,19 @@ class IPSecNATTestCase(TemplateIpsec):
icmp_id_in = 6305
icmp_id_out = 6305
@classmethod
def setUpClass(cls):
super(IPSecNATTestCase, cls).setUpClass()
cls.tun_if = cls.pg0
cls.vapi.ipsec_spd_add_del(cls.tun_spd_id)
cls.vapi.ipsec_interface_add_del_spd(cls.tun_spd_id,
cls.tun_if.sw_if_index)
p = cls.ipv4_params
cls.config_esp_tun(p)
cls.logger.info(cls.vapi.ppcli("show ipsec"))
def setUp(self):
super(IPSecNATTestCase, self).setUp()
self.tun_if = self.pg0
self.vapi.ipsec_spd_add_del(self.tun_spd_id)
self.vapi.ipsec_interface_add_del_spd(self.tun_spd_id,
self.tun_if.sw_if_index)
p = self.ipv4_params
self.config_esp_tun(p)
self.logger.info(self.vapi.ppcli("show ipsec"))
src = socket.inet_pton(p.addr_type, p.remote_tun_if_host)
cls.vapi.ip_add_del_route(src, p.addr_len,
cls.tun_if.remote_addr_n[p.addr_type],
is_ipv6=p.is_ipv6)
self.vapi.ip_add_del_route(src, p.addr_len,
self.tun_if.remote_addr_n[p.addr_type],
is_ipv6=p.is_ipv6)
def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip):
return [
@ -131,8 +130,7 @@ class IPSecNATTestCase(TemplateIpsec):
ppp("Unexpected or invalid encrypted packet:", packet))
raise
@classmethod
def config_esp_tun(cls, params):
def config_esp_tun(self, params):
addr_type = params.addr_type
scapy_tun_sa_id = params.scapy_tun_sa_id
scapy_tun_spi = params.scapy_tun_spi
@ -144,50 +142,50 @@ class IPSecNATTestCase(TemplateIpsec):
crypt_key = params.crypt_key
addr_any = params.addr_any
addr_bcast = params.addr_bcast
cls.vapi.ipsec_sad_add_del_entry(scapy_tun_sa_id, scapy_tun_spi,
auth_algo_vpp_id, auth_key,
crypt_algo_vpp_id, crypt_key,
cls.vpp_esp_protocol,
cls.pg1.remote_addr_n[addr_type],
cls.tun_if.remote_addr_n[addr_type],
udp_encap=1)
cls.vapi.ipsec_sad_add_del_entry(vpp_tun_sa_id, vpp_tun_spi,
auth_algo_vpp_id, auth_key,
crypt_algo_vpp_id, crypt_key,
cls.vpp_esp_protocol,
cls.tun_if.remote_addr_n[addr_type],
cls.pg1.remote_addr_n[addr_type],
udp_encap=1)
self.vapi.ipsec_sad_add_del_entry(scapy_tun_sa_id, scapy_tun_spi,
auth_algo_vpp_id, auth_key,
crypt_algo_vpp_id, crypt_key,
self.vpp_esp_protocol,
self.pg1.remote_addr_n[addr_type],
self.tun_if.remote_addr_n[addr_type],
udp_encap=1)
self.vapi.ipsec_sad_add_del_entry(vpp_tun_sa_id, vpp_tun_spi,
auth_algo_vpp_id, auth_key,
crypt_algo_vpp_id, crypt_key,
self.vpp_esp_protocol,
self.tun_if.remote_addr_n[addr_type],
self.pg1.remote_addr_n[addr_type],
udp_encap=1)
l_startaddr = r_startaddr = socket.inet_pton(addr_type, addr_any)
l_stopaddr = r_stopaddr = socket.inet_pton(addr_type, addr_bcast)
cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id,
l_startaddr, l_stopaddr, r_startaddr,
r_stopaddr,
protocol=socket.IPPROTO_ESP)
cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id,
l_startaddr, l_stopaddr, r_startaddr,
r_stopaddr, is_outbound=0,
protocol=socket.IPPROTO_ESP)
cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id,
l_startaddr, l_stopaddr, r_startaddr,
r_stopaddr, remote_port_start=4500,
remote_port_stop=4500,
protocol=socket.IPPROTO_UDP)
cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id,
l_startaddr, l_stopaddr, r_startaddr,
r_stopaddr, remote_port_start=4500,
remote_port_stop=4500,
protocol=socket.IPPROTO_UDP,
is_outbound=0)
l_startaddr = l_stopaddr = cls.tun_if.remote_addr_n[addr_type]
r_startaddr = r_stopaddr = cls.pg1.remote_addr_n[addr_type]
cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, vpp_tun_sa_id,
l_startaddr, l_stopaddr, r_startaddr,
r_stopaddr, priority=10, policy=3,
is_outbound=0)
cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id,
r_startaddr, r_stopaddr, l_startaddr,
l_stopaddr, priority=10, policy=3)
self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
l_startaddr, l_stopaddr, r_startaddr,
r_stopaddr,
protocol=socket.IPPROTO_ESP)
self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
l_startaddr, l_stopaddr, r_startaddr,
r_stopaddr, is_outbound=0,
protocol=socket.IPPROTO_ESP)
self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
l_startaddr, l_stopaddr, r_startaddr,
r_stopaddr, remote_port_start=4500,
remote_port_stop=4500,
protocol=socket.IPPROTO_UDP)
self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
l_startaddr, l_stopaddr, r_startaddr,
r_stopaddr, remote_port_start=4500,
remote_port_stop=4500,
protocol=socket.IPPROTO_UDP,
is_outbound=0)
l_startaddr = l_stopaddr = self.tun_if.remote_addr_n[addr_type]
r_startaddr = r_stopaddr = self.pg1.remote_addr_n[addr_type]
self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, vpp_tun_sa_id,
l_startaddr, l_stopaddr, r_startaddr,
r_stopaddr, priority=10, policy=3,
is_outbound=0)
self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
r_startaddr, r_stopaddr, l_startaddr,
l_stopaddr, priority=10, policy=3)
def test_ipsec_nat_tun(self):
""" IPSec/NAT tunnel test case """

View File

@ -11,12 +11,11 @@ class TemplateIpsecTunIfEsp(TemplateIpsec):
encryption_type = ESP
@classmethod
def setUpClass(cls):
super(TemplateIpsecTunIfEsp, cls).setUpClass()
cls.tun_if = cls.pg0
def setUp(self):
super(TemplateIpsecTunIfEsp, self).setUp()
self.tun_if = self.pg0
p = self.ipv4_params
tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
p.scapy_tun_spi, p.crypt_algo_vpp_id,