linux-cp: Add tests for tun devices

Type: test

Signed-off-by: Neale Ranns <neale@graphiant.com>
Change-Id: Iec69d8624b15766ed65e7d09777819d2242dee17
This commit is contained in:
Neale Ranns
2021-06-07 09:34:07 +00:00
committed by Ole Tr�an
parent 6197cb730e
commit 89d939e52c
6 changed files with 298 additions and 5 deletions

View File

@ -696,9 +696,40 @@ vl_api_ipsec_itf_delete_t_handler (vl_api_ipsec_itf_delete_t * mp)
REPLY_MACRO (VL_API_IPSEC_ITF_DELETE_REPLY); REPLY_MACRO (VL_API_IPSEC_ITF_DELETE_REPLY);
} }
static walk_rc_t
send_ipsec_itf_details (ipsec_itf_t *itf, void *arg)
{
ipsec_dump_walk_ctx_t *ctx = arg;
vl_api_ipsec_itf_details_t *mp;
mp = vl_msg_api_alloc (sizeof (*mp));
clib_memset (mp, 0, sizeof (*mp));
mp->_vl_msg_id = ntohs (VL_API_IPSEC_ITF_DETAILS);
mp->context = ctx->context;
mp->itf.mode = tunnel_mode_encode (itf->ii_mode);
mp->itf.user_instance = htonl (itf->ii_user_instance);
mp->itf.sw_if_index = htonl (itf->ii_sw_if_index);
vl_api_send_msg (ctx->reg, (u8 *) mp);
return (WALK_CONTINUE);
}
static void static void
vl_api_ipsec_itf_dump_t_handler (vl_api_ipsec_itf_dump_t * mp) vl_api_ipsec_itf_dump_t_handler (vl_api_ipsec_itf_dump_t * mp)
{ {
vl_api_registration_t *reg;
reg = vl_api_client_index_to_registration (mp->client_index);
if (!reg)
return;
ipsec_dump_walk_ctx_t ctx = {
.reg = reg,
.context = mp->context,
};
ipsec_itf_walk (send_ipsec_itf_details, &ctx);
} }
typedef struct ipsec_sa_dump_match_ctx_t_ typedef struct ipsec_sa_dump_match_ctx_t_

View File

@ -342,6 +342,18 @@ ipsec_itf_delete (u32 sw_if_index)
return 0; return 0;
} }
void
ipsec_itf_walk (ipsec_itf_walk_cb_t cb, void *ctx)
{
ipsec_itf_t *itf;
pool_foreach (itf, ipsec_itf_pool)
{
if (WALK_CONTINUE != cb (itf, ctx))
break;
}
}
static clib_error_t * static clib_error_t *
ipsec_itf_create_cli (vlib_main_t * vm, ipsec_itf_create_cli (vlib_main_t * vm,
unformat_input_t * input, vlib_cli_command_t * cmd) unformat_input_t * input, vlib_cli_command_t * cmd)

View File

@ -110,6 +110,9 @@ extern u8 *format_ipsec_itf (u8 * s, va_list * a);
extern ipsec_itf_t *ipsec_itf_get (index_t ii); extern ipsec_itf_t *ipsec_itf_get (index_t ii);
typedef walk_rc_t (*ipsec_itf_walk_cb_t) (ipsec_itf_t *itf, void *ctx);
extern void ipsec_itf_walk (ipsec_itf_walk_cb_t cd, void *ctx);
/* /*
* fd.io coding-style-patch-verification: ON * fd.io coding-style-patch-verification: ON
* *

View File

@ -6,8 +6,16 @@ from scapy.layers.inet import IP, UDP
from scapy.layers.inet6 import IPv6, Raw from scapy.layers.inet6 import IPv6, Raw
from scapy.layers.l2 import Ether, ARP, Dot1Q from scapy.layers.l2 import Ether, ARP, Dot1Q
from util import reassemble4
from vpp_object import VppObject from vpp_object import VppObject
from framework import VppTestCase, VppTestRunner from framework import VppTestCase, VppTestRunner
from vpp_ipip_tun_interface import VppIpIpTunInterface
from template_ipsec import TemplateIpsec, IpsecTun4Tests, \
IpsecTun4, mk_scapy_crypt_key, config_tun_params
from template_ipsec import TemplateIpsec, IpsecTun4Tests, \
IpsecTun4, mk_scapy_crypt_key, config_tun_params
from test_ipsec_tun_if_esp import TemplateIpsecItf4
from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
class VppLcpPair(VppObject): class VppLcpPair(VppObject):
@ -62,10 +70,13 @@ class TestLinuxCP(VppTestCase):
def setUp(self): def setUp(self):
super(TestLinuxCP, self).setUp() super(TestLinuxCP, self).setUp()
# create 4 pg interfaces so there are a few addresses # create 4 pg interfaces so we can create two pairs
# in the FIB
self.create_pg_interfaces(range(4)) self.create_pg_interfaces(range(4))
# create on ip4 and one ip6 pg tun
self.pg_interfaces += self.create_pg_ip4_interfaces(range(4, 5))
self.pg_interfaces += self.create_pg_ip6_interfaces(range(5, 6))
for i in self.pg_interfaces: for i in self.pg_interfaces:
i.admin_up() i.admin_up()
@ -169,6 +180,237 @@ class TestLinuxCP(VppTestCase):
for phy in phys: for phy in phys:
phy.unconfig_ip4() phy.unconfig_ip4()
def test_linux_cp_tun(self):
""" Linux CP TUN """
#
# Setup
#
N_PKTS = 31
# create two pairs, wihch a bunch of hots on the phys
hosts = [self.pg4, self.pg5]
phy = self.pg2
phy.config_ip4()
phy.config_ip6()
phy.resolve_arp()
phy.resolve_ndp()
tun4 = VppIpIpTunInterface(
self,
phy,
phy.local_ip4,
phy.remote_ip4).add_vpp_config()
tun6 = VppIpIpTunInterface(
self,
phy,
phy.local_ip6,
phy.remote_ip6).add_vpp_config()
tuns = [tun4, tun6]
tun4.admin_up()
tun4.config_ip4()
tun6.admin_up()
tun6.config_ip6()
pair1 = VppLcpPair(self, tuns[0], hosts[0]).add_vpp_config()
pair2 = VppLcpPair(self, tuns[1], hosts[1]).add_vpp_config()
self.logger.info(self.vapi.cli("sh lcp adj verbose"))
self.logger.info(self.vapi.cli("sh lcp"))
self.logger.info(self.vapi.cli("sh ip punt redirect"))
#
# Traffic Tests
#
# host to phy for v4
p = (IP(src=tun4.local_ip4, dst="2.2.2.2") /
UDP(sport=1234, dport=1234) /
Raw())
rxs = self.send_and_expect(self.pg4, p * N_PKTS, phy)
# verify inner packet is unchanged and has the tunnel encap
for rx in rxs:
self.assertEqual(rx[Ether].dst, phy.remote_mac)
self.assertEqual(rx[IP].dst, phy.remote_ip4)
self.assertEqual(rx[IP].src, phy.local_ip4)
inner = IP(rx[IP].payload)
self.assertEqual(inner.src, tun4.local_ip4)
self.assertEqual(inner.dst, "2.2.2.2")
# host to phy for v6
p = (IPv6(src=tun6.local_ip6, dst="2::2") /
UDP(sport=1234, dport=1234) /
Raw())
rxs = self.send_and_expect(self.pg5, p * N_PKTS, phy)
# verify inner packet is unchanged and has the tunnel encap
for rx in rxs:
self.assertEqual(rx[IPv6].dst, phy.remote_ip6)
self.assertEqual(rx[IPv6].src, phy.local_ip6)
inner = IPv6(rx[IPv6].payload)
self.assertEqual(inner.src, tun6.local_ip6)
self.assertEqual(inner.dst, "2::2")
# phy to host v4
p = (Ether(dst=phy.local_mac, src=phy.remote_mac) /
IP(dst=phy.local_ip4, src=phy.remote_ip4) /
IP(dst=tun4.local_ip4, src=tun4.remote_ip4) /
UDP(sport=1234, dport=1234) /
Raw())
rxs = self.send_and_expect(phy, p * N_PKTS, self.pg4)
for rx in rxs:
rx = IP(rx)
self.assertEqual(rx[IP].dst, tun4.local_ip4)
self.assertEqual(rx[IP].src, tun4.remote_ip4)
# phy to host v6
p = (Ether(dst=phy.local_mac, src=phy.remote_mac) /
IPv6(dst=phy.local_ip6, src=phy.remote_ip6) /
IPv6(dst=tun6.local_ip6, src=tun6.remote_ip6) /
UDP(sport=1234, dport=1234) /
Raw())
rxs = self.send_and_expect(phy, p * N_PKTS, self.pg5)
for rx in rxs:
rx = IPv6(rx)
self.assertEqual(rx[IPv6].dst, tun6.local_ip6)
self.assertEqual(rx[IPv6].src, tun6.remote_ip6)
# cleanup
phy.unconfig_ip4()
phy.unconfig_ip6()
tun4.unconfig_ip4()
tun6.unconfig_ip6()
class TestLinuxCPIpsec(TemplateIpsec,
TemplateIpsecItf4,
IpsecTun4):
""" IPsec Interface IPv4 """
extra_vpp_plugin_config = ["plugin",
"linux_cp_plugin.so",
"{", "enable", "}",
"plugin",
"linux_cp_unittest_plugin.so",
"{", "enable", "}"]
def setUp(self):
super(TestLinuxCPIpsec, self).setUp()
self.tun_if = self.pg0
self.pg_interfaces += self.create_pg_ip4_interfaces(range(3, 4))
self.pg_interfaces += self.create_pg_ip6_interfaces(range(4, 5))
def tearDown(self):
super(TestLinuxCPIpsec, self).tearDown()
def verify_encrypted(self, p, sa, rxs):
decrypt_pkts = []
for rx in rxs:
if p.nat_header:
self.assertEqual(rx[UDP].dport, 4500)
self.assert_packet_checksums_valid(rx)
self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
try:
rx_ip = rx[IP]
decrypt_pkt = p.vpp_tun_sa.decrypt(rx_ip)
if not decrypt_pkt.haslayer(IP):
decrypt_pkt = IP(decrypt_pkt[Raw].load)
if rx_ip.proto == socket.IPPROTO_ESP:
self.verify_esp_padding(sa, rx_ip[ESP].data, decrypt_pkt)
decrypt_pkts.append(decrypt_pkt)
self.assert_equal(decrypt_pkt.src, p.tun_if.local_ip4)
self.assert_equal(decrypt_pkt.dst, p.tun_if.remote_ip4)
except:
self.logger.debug(ppp("Unexpected packet:", rx))
try:
self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
except:
pass
raise
pkts = reassemble4(decrypt_pkts)
for pkt in pkts:
self.assert_packet_checksums_valid(pkt)
def verify_decrypted(self, p, rxs):
for rx in rxs:
rx = IP(rx)
self.assert_equal(rx[IP].src, p.tun_if.remote_ip4)
self.assert_equal(rx[IP].dst, p.tun_if.local_ip4)
self.assert_packet_checksums_valid(rx)
def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
payload_size=54):
return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
sa.encrypt(IP(src=src, dst=dst) /
UDP(sport=1111, dport=2222) /
Raw(b'X' * payload_size))
for i in range(count)]
def test_linux_cp_ipsec4_tun(self):
""" Linux CP Ipsec TUN """
#
# Setup
#
N_PKTS = 31
# the pg that paris with the tunnel
self.host = self.pg3
# tunnel and protection setup
p = self.ipv4_params
self.config_network(p)
self.config_sa_tun(p,
self.pg0.local_ip4,
self.pg0.remote_ip4)
self.config_protect(p)
pair = VppLcpPair(self, p.tun_if, self.host).add_vpp_config()
self.logger.error(self.vapi.cli("sh int addr"))
self.logger.info(self.vapi.cli("sh lcp"))
self.logger.info(self.vapi.cli("sh ip punt redirect"))
#
# Traffic Tests
#
# host to phy for v4
pkt = (IP(src=p.tun_if.local_ip4,
dst=p.tun_if.remote_ip4) /
UDP(sport=1234, dport=1234) /
Raw())
rxs = self.send_and_expect(self.host, pkt * N_PKTS, self.tun_if)
self.verify_encrypted(p, p.vpp_tun_sa, rxs)
# phy to host for v4
pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
src=p.tun_if.remote_ip4,
dst=p.tun_if.local_ip4,
count=N_PKTS)
try:
rxs = self.send_and_expect(self.tun_if, pkts, self.host)
self.verify_decrypted(p, rxs)
finally:
self.logger.error(self.vapi.cli("sh trace"))
# cleanup
pair.remove_vpp_config()
self.unconfig_protect(p)
self.unconfig_sa(p)
self.unconfig_network(p)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner) unittest.main(testRunner=VppTestRunner)

View File

@ -34,6 +34,9 @@ class VppIpIpTunInterface(VppTunnelInterface):
'mode': self.mode, 'mode': self.mode,
}) })
self.set_sw_if_index(r.sw_if_index) self.set_sw_if_index(r.sw_if_index)
r = self.test.vapi.ipip_tunnel_dump(
sw_if_index=self.sw_if_index)
self.instance = r[0].tunnel.instance
self.test.registry.register(self, self.test.logger) self.test.registry.register(self, self.test.logger)
return self return self
@ -51,7 +54,7 @@ class VppIpIpTunInterface(VppTunnelInterface):
return self.object_id() return self.object_id()
def object_id(self): def object_id(self):
return "ipip-%d" % self._sw_if_index return "ipip%d" % self.instance
@property @property
def remote_ip(self): def remote_ip(self):

View File

@ -412,6 +412,8 @@ class VppIpsecInterface(VppInterface):
}) })
self.set_sw_if_index(r.sw_if_index) self.set_sw_if_index(r.sw_if_index)
self.test.registry.register(self, self.test.logger) self.test.registry.register(self, self.test.logger)
ts = self.test.vapi.ipsec_itf_dump(sw_if_index=self._sw_if_index)
self.instance = ts[0].itf.user_instance
return self return self
def remove_vpp_config(self): def remove_vpp_config(self):
@ -420,7 +422,7 @@ class VppIpsecInterface(VppInterface):
def query_vpp_config(self): def query_vpp_config(self):
ts = self.test.vapi.ipsec_itf_dump(sw_if_index=0xffffffff) ts = self.test.vapi.ipsec_itf_dump(sw_if_index=0xffffffff)
for t in ts: for t in ts:
if t.tunnel.sw_if_index == self._sw_if_index: if t.itf.sw_if_index == self._sw_if_index:
return True return True
return False return False
@ -428,4 +430,4 @@ class VppIpsecInterface(VppInterface):
return self.object_id() return self.object_id()
def object_id(self): def object_id(self):
return "ipsec-%d" % self._sw_if_index return "ipsec%d" % self.instance