IPSEC: tests use opbject registry

this means we test the dumps - to some extent

Change-Id: I8d90745701012012b41a7b3aaf9be97b4dd2bdf8
Signed-off-by: Neale Ranns <nranns@cisco.com>
This commit is contained in:
Neale Ranns
2019-01-24 04:52:25 -08:00
parent 9e47ac54c9
commit 311124e21b
8 changed files with 590 additions and 540 deletions
+11 -1
View File
@@ -460,7 +460,7 @@ show_ipsec_command_fn (vlib_main_t * vm,
vnet_hw_interface_t *hi;
u8 *protocol = NULL;
u8 *policy = NULL;
u32 tx_table_id;
u32 tx_table_id, spd_id, sw_if_index;
/* *INDENT-OFF* */
pool_foreach (sa, im->sad, ({
@@ -665,6 +665,16 @@ show_ipsec_command_fn (vlib_main_t * vm,
}));
/* *INDENT-ON* */
vlib_cli_output (vm, "SPD Bindings:");
/* *INDENT-OFF* */
hash_foreach(sw_if_index, spd_id, im->spd_index_by_sw_if_index, ({
vlib_cli_output (vm, " %d -> %U", spd_id,
format_vnet_sw_if_index_name, im->vnet_main,
sw_if_index);
}));
/* *INDENT-ON* */
vlib_cli_output (vm, "tunnel interfaces");
/* *INDENT-OFF* */
pool_foreach (t, im->tunnel_interfaces, ({
+131 -257
View File
File diff suppressed because it is too large Load Diff
+106 -228
View File
File diff suppressed because it is too large Load Diff
+70 -51
View File
@@ -7,6 +7,9 @@ from scapy.layers.inet import ICMP, IP, TCP, UDP
from scapy.layers.ipsec import SecurityAssociation, ESP
from util import ppp, ppc
from template_ipsec import TemplateIpsec
from vpp_ipsec import *
from vpp_ip_route import VppIpRoute, VppRoutePath
from vpp_ip import DpoProto
class IPSecNATTestCase(TemplateIpsec):
@@ -34,16 +37,25 @@ class IPSecNATTestCase(TemplateIpsec):
def setUp(self):
super(IPSecNATTestCase, self).setUp()
self.tun_if = self.pg0
self.vapi.ipsec_spd_add_del(self.tun_spd_id)
self.vapi.ipsec_interface_add_del_spd(self.tun_spd_id,
self.tun_if.sw_if_index)
self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
self.tun_spd.add_vpp_config()
VppIpsecSpdItfBinding(self, self.tun_spd,
self.tun_if).add_vpp_config()
p = self.ipv4_params
self.config_esp_tun(p)
self.logger.info(self.vapi.ppcli("show ipsec"))
src = socket.inet_pton(p.addr_type, p.remote_tun_if_host)
self.vapi.ip_add_del_route(src, p.addr_len,
self.tun_if.remote_addr_n[p.addr_type],
is_ipv6=p.is_ipv6)
d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4
VppIpRoute(self, p.remote_tun_if_host, p.addr_len,
[VppRoutePath(self.tun_if.remote_addr[p.addr_type],
0xffffffff,
proto=d)],
is_ip6=p.is_ipv6).add_vpp_config()
def tearDown(self):
super(IPSecNATTestCase, self).tearDown()
def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip):
return [
@@ -142,50 +154,57 @@ class IPSecNATTestCase(TemplateIpsec):
crypt_key = params.crypt_key
addr_any = params.addr_any
addr_bcast = params.addr_bcast
self.vapi.ipsec_sad_add_del_entry(scapy_tun_sa_id, scapy_tun_spi,
auth_algo_vpp_id, auth_key,
crypt_algo_vpp_id, crypt_key,
self.vpp_esp_protocol,
self.pg1.remote_addr_n[addr_type],
self.tun_if.remote_addr_n[addr_type],
udp_encap=1)
self.vapi.ipsec_sad_add_del_entry(vpp_tun_sa_id, vpp_tun_spi,
auth_algo_vpp_id, auth_key,
crypt_algo_vpp_id, crypt_key,
self.vpp_esp_protocol,
self.tun_if.remote_addr_n[addr_type],
self.pg1.remote_addr_n[addr_type],
udp_encap=1)
l_startaddr = r_startaddr = socket.inet_pton(addr_type, addr_any)
l_stopaddr = r_stopaddr = socket.inet_pton(addr_type, addr_bcast)
self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
l_startaddr, l_stopaddr, r_startaddr,
r_stopaddr,
protocol=socket.IPPROTO_ESP)
self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
l_startaddr, l_stopaddr, r_startaddr,
r_stopaddr, is_outbound=0,
protocol=socket.IPPROTO_ESP)
self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
l_startaddr, l_stopaddr, r_startaddr,
r_stopaddr, remote_port_start=4500,
remote_port_stop=4500,
protocol=socket.IPPROTO_UDP)
self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
l_startaddr, l_stopaddr, r_startaddr,
r_stopaddr, remote_port_start=4500,
remote_port_stop=4500,
protocol=socket.IPPROTO_UDP,
is_outbound=0)
l_startaddr = l_stopaddr = self.tun_if.remote_addr_n[addr_type]
r_startaddr = r_stopaddr = self.pg1.remote_addr_n[addr_type]
self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, vpp_tun_sa_id,
l_startaddr, l_stopaddr, r_startaddr,
r_stopaddr, priority=10, policy=3,
is_outbound=0)
self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id,
r_startaddr, r_stopaddr, l_startaddr,
l_stopaddr, priority=10, policy=3)
VppIpsecSA(self, scapy_tun_sa_id, scapy_tun_spi,
auth_algo_vpp_id, auth_key,
crypt_algo_vpp_id, crypt_key,
self.vpp_esp_protocol,
self.pg1.remote_addr[addr_type],
self.tun_if.remote_addr[addr_type],
udp_encap=1).add_vpp_config()
VppIpsecSA(self, vpp_tun_sa_id, vpp_tun_spi,
auth_algo_vpp_id, auth_key,
crypt_algo_vpp_id, crypt_key,
self.vpp_esp_protocol,
self.tun_if.remote_addr[addr_type],
self.pg1.remote_addr[addr_type],
udp_encap=1).add_vpp_config()
VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
addr_any, addr_bcast,
addr_any, addr_bcast,
socket.IPPROTO_ESP).add_vpp_config()
VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
addr_any, addr_bcast,
addr_any, addr_bcast,
socket.IPPROTO_ESP,
is_outbound=0).add_vpp_config()
VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
addr_any, addr_bcast,
addr_any, addr_bcast,
socket.IPPROTO_UDP,
remote_port_start=4500,
remote_port_stop=4500).add_vpp_config()
VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
addr_any, addr_bcast,
addr_any, addr_bcast,
socket.IPPROTO_UDP,
remote_port_start=4500,
remote_port_stop=4500,
is_outbound=0).add_vpp_config()
VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
self.tun_if.remote_addr[addr_type],
self.tun_if.remote_addr[addr_type],
self.pg1.remote_addr[addr_type],
self.pg1.remote_addr[addr_type],
0, priority=10, policy=3,
is_outbound=0).add_vpp_config()
VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
self.pg1.remote_addr[addr_type],
self.pg1.remote_addr[addr_type],
self.tun_if.remote_addr[addr_type],
self.tun_if.remote_addr[addr_type],
0, priority=10, policy=3).add_vpp_config()
def test_ipsec_nat_tun(self):
""" IPSec/NAT tunnel test case """
+5 -2
View File
@@ -4,6 +4,7 @@ from scapy.layers.ipsec import ESP
from framework import VppTestRunner
from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTcpTests
from vpp_ipsec_tun_interface import VppIpsecTunInterface
from vpp_ip_route import VppIpRoute, VppRoutePath
class TemplateIpsecTunIfEsp(TemplateIpsec):
@@ -25,8 +26,10 @@ class TemplateIpsecTunIfEsp(TemplateIpsec):
tun_if.add_vpp_config()
tun_if.admin_up()
tun_if.config_ip4()
src4 = socket.inet_pton(socket.AF_INET, p.remote_tun_if_host)
self.vapi.ip_add_del_route(src4, 32, tun_if.remote_ip4n)
VppIpRoute(self, p.remote_tun_if_host, 32,
[VppRoutePath(tun_if.remote_ip4,
0xffffffff)]).add_vpp_config()
def tearDown(self):
if not self.vpp_dead:
+1 -1
View File
@@ -279,13 +279,13 @@ class VppRoutePath(object):
is_dvr=0,
next_hop_id=0xffffffff,
proto=DpoProto.DPO_PROTO_IP4):
self.proto = proto
self.nh_itf = nh_sw_if_index
self.nh_table_id = nh_table_id
self.nh_via_label = nh_via_label
self.nh_labels = labels
self.weight = 1
self.rpf_id = rpf_id
self.proto = proto
if self.proto is DpoProto.DPO_PROTO_IP6:
self.nh_addr = inet_pton(AF_INET6, nh_addr)
elif self.proto is DpoProto.DPO_PROTO_IP4:
+249
View File
@@ -0,0 +1,249 @@
from vpp_object import *
from ipaddress import ip_address
try:
text_type = unicode
except NameError:
text_type = str
class VppIpsecSpd(VppObject):
"""
VPP SPD DB
"""
def __init__(self, test, id):
self.test = test
self.id = id
def add_vpp_config(self):
self.test.vapi.ipsec_spd_add_del(self.id)
self.test.registry.register(self, self.test.logger)
def remove_vpp_config(self):
self.test.vapi.ipsec_spd_add_del(self.id, is_add=0)
def __str__(self):
return self.object_id()
def object_id(self):
return "ipsec-spd-%d" % self.id
def query_vpp_config(self):
spds = self.test.vapi.ipsec_spds_dump()
for spd in spds:
if spd.spd_id == self.id:
return True
return False
class VppIpsecSpdItfBinding(VppObject):
"""
VPP SPD DB to interface binding
(i.e. this SPD is used on this interfce)
"""
def __init__(self, test, spd, itf):
self.test = test
self.spd = spd
self.itf = itf
def add_vpp_config(self):
self.test.vapi.ipsec_interface_add_del_spd(self.spd.id,
self.itf.sw_if_index)
self.test.registry.register(self, self.test.logger)
def remove_vpp_config(self):
self.test.vapi.ipsec_interface_add_del_spd(self.spd.id,
self.itf.sw_if_index,
is_add=0)
def __str__(self):
return self.object_id()
def object_id(self):
return "bind-%s-to-%s" % (self.spd.id, self.itf)
def query_vpp_config(self):
bs = self.test.vapi.ipsec_spd_interface_dump()
for b in bs:
if b.sw_if_index == self.itf.sw_if_index:
return True
return False
class VppIpsecSpdEntry(VppObject):
"""
VPP SPD DB Entry
"""
def __init__(self, test, spd, sa_id,
local_start, local_stop,
remote_start, remote_stop,
proto,
priority=100,
policy=0,
is_outbound=1,
remote_port_start=0,
remote_port_stop=65535,
local_port_start=0,
local_port_stop=65535):
self.test = test
self.spd = spd
self.sa_id = sa_id
self.local_start = ip_address(text_type(local_start))
self.local_stop = ip_address(text_type(local_stop))
self.remote_start = ip_address(text_type(remote_start))
self.remote_stop = ip_address(text_type(remote_stop))
self.proto = proto
self.is_outbound = is_outbound
self.priority = priority
self.policy = policy
self.is_ipv6 = (0 if self.local_start.version == 4 else 1)
self.local_port_start = local_port_start
self.local_port_stop = local_port_stop
self.remote_port_start = remote_port_start
self.remote_port_stop = remote_port_stop
def add_vpp_config(self):
self.test.vapi.ipsec_spd_add_del_entry(
self.spd.id,
self.sa_id,
self.local_start.packed,
self.local_stop.packed,
self.remote_start.packed,
self.remote_stop.packed,
protocol=self.proto,
is_ipv6=self.is_ipv6,
is_outbound=self.is_outbound,
priority=self.priority,
policy=self.policy,
local_port_start=self.local_port_start,
local_port_stop=self.local_port_stop,
remote_port_start=self.remote_port_start,
remote_port_stop=self.remote_port_stop)
self.test.registry.register(self, self.test.logger)
def remove_vpp_config(self):
self.test.vapi.ipsec_spd_add_del_entry(
self.spd.id,
self.sa_id,
self.local_start.packed,
self.local_stop.packed,
self.remote_start.packed,
self.remote_stop.packed,
protocol=self.proto,
is_ipv6=self.is_ipv6,
is_outbound=self.is_outbound,
priority=self.priority,
policy=self.policy,
local_port_start=self.local_port_start,
local_port_stop=self.local_port_stop,
remote_port_start=self.remote_port_start,
remote_port_stop=self.remote_port_stop,
is_add=0)
def __str__(self):
return self.object_id()
def object_id(self):
return "spd-entry-%d-%d-%d-%d-%d-%d" % (self.spd.id,
self.priority,
self.policy,
self.is_outbound,
self.is_ipv6,
self.remote_port_start)
def query_vpp_config(self):
ss = self.test.vapi.ipsec_spd_dump(self.spd.id)
for s in ss:
if s.sa_id == self.sa_id and \
s.is_outbound == self.is_outbound and \
s.priority == self.priority and \
s.policy == self.policy and \
s.is_ipv6 == self.is_ipv6 and \
s.remote_start_port == self.remote_port_start:
return True
return False
class VppIpsecSA(VppObject):
"""
VPP SAD Entry
"""
def __init__(self, test, id, spi,
integ_alg, integ_key,
crypto_alg, crypto_key,
proto,
tun_src=None, tun_dst=None,
use_anti_replay=0,
udp_encap=0):
self.test = test
self.id = id
self.spi = spi
self.integ_alg = integ_alg
self.integ_key = integ_key
self.crypto_alg = crypto_alg
self.crypto_key = crypto_key
self.proto = proto
self.is_tunnel = 0
self.is_tunnel_v6 = 0
self.tun_src = tun_src
self.tun_dst = tun_dst
if (tun_src):
self.tun_src = ip_address(text_type(tun_src))
self.is_tunnel = 1
if (self.tun_src.version == 6):
self.is_tunnel_v6 = 1
if (tun_dst):
self.tun_dst = ip_address(text_type(tun_dst))
self.use_anti_replay = use_anti_replay
self.udp_encap = udp_encap
def add_vpp_config(self):
self.test.vapi.ipsec_sad_add_del_entry(
self.id,
self.spi,
self.integ_alg,
self.integ_key,
self.crypto_alg,
self.crypto_key,
self.proto,
(self.tun_src.packed if self.tun_src else []),
(self.tun_dst.packed if self.tun_dst else []),
is_tunnel=self.is_tunnel,
is_tunnel_ipv6=self.is_tunnel_v6,
use_anti_replay=self.use_anti_replay,
udp_encap=self.udp_encap)
self.test.registry.register(self, self.test.logger)
def remove_vpp_config(self):
self.test.vapi.ipsec_sad_add_del_entry(
self.id,
self.spi,
self.integ_alg,
self.integ_key,
self.crypto_alg,
self.crypto_key,
self.proto,
(self.tun_src.packed if self.tun_src else []),
(self.tun_dst.packed if self.tun_dst else []),
is_tunnel=self.is_tunnel,
is_tunnel_ipv6=self.is_tunnel_v6,
use_anti_replay=self.use_anti_replay,
udp_encap=self.udp_encap,
is_add=0)
def __str__(self):
return self.object_id()
def object_id(self):
return "ipsec-sa-%d" % self.id
def query_vpp_config(self):
bs = self.test.vapi.ipsec_sa_dump()
for b in bs:
if b.sa_id == self.id:
return True
return False
+17
View File
@@ -3347,6 +3347,9 @@ class VppPapiProvider(object):
self.papi.ipsec_spd_add_del, {
'spd_id': spd_id, 'is_add': is_add})
def ipsec_spds_dump(self):
return self.api(self.papi.ipsec_spds_dump, {})
def ipsec_interface_add_del_spd(self, spd_id, sw_if_index, is_add=1):
""" IPSEC interface SPD add/del - \
Wrapper to associate/disassociate SPD to interface in VPP
@@ -3363,6 +3366,11 @@ class VppPapiProvider(object):
self.papi.ipsec_interface_add_del_spd,
{'spd_id': spd_id, 'sw_if_index': sw_if_index, 'is_add': is_add})
def ipsec_spd_interface_dump(self, spd_index=None):
return self.api(self.papi.ipsec_spd_interface_dump,
{'spd_index': spd_index if spd_index else 0,
'spd_index_valid': 1 if spd_index else 0})
def ipsec_sad_add_del_entry(self,
sad_id,
spi,
@@ -3415,6 +3423,10 @@ class VppPapiProvider(object):
'use_extended_sequence_number': use_extended_sequence_number,
'use_anti_replay': use_anti_replay})
def ipsec_sa_dump(self, sa_id=None):
return self.api(self.papi.ipsec_sa_dump,
{'sa_id': sa_id if sa_id else 0xffffffff})
def ipsec_spd_add_del_entry(self,
spd_id,
sa_id,
@@ -3473,6 +3485,11 @@ class VppPapiProvider(object):
'is_ipv6': is_ipv6,
'is_ip_any': is_ip_any})
def ipsec_spd_dump(self, spd_id, sa_id=0xffffffff):
return self.api(self.papi.ipsec_spd_dump,
{'spd_id': spd_id,
'sa_id': sa_id})
def ipsec_tunnel_if_add_del(self, local_ip, remote_ip, local_spi,
remote_spi, crypto_alg, local_crypto_key,
remote_crypto_key, integ_alg, local_integ_key,