gso: add ipsec tunnel tests
Type: test Change-Id: I831bc8c21f8ce869054eafcb14542508039c1b82 Signed-off-by: Mohsin Kazmi <sykazmi@cisco.com>
This commit is contained in:

committed by
Beno�t Ganne

parent
a4d0956082
commit
0972edc1c2
247
test/test_gso.py
247
test/test_gso.py
@ -16,7 +16,9 @@ from scapy.layers.inet6 import ipv6nh, IPerror6
|
||||
from scapy.layers.inet import TCP, ICMP
|
||||
from scapy.layers.vxlan import VXLAN
|
||||
from scapy.data import ETH_P_IP, ETH_P_IPV6, ETH_P_ARP
|
||||
from scapy.layers.ipsec import SecurityAssociation, ESP
|
||||
|
||||
from vpp_papi import VppEnum
|
||||
from framework import VppTestCase, VppTestRunner
|
||||
from vpp_object import VppObject
|
||||
from vpp_interface import VppInterface
|
||||
@ -27,6 +29,9 @@ from vpp_vxlan_tunnel import VppVxlanTunnel
|
||||
from socket import AF_INET, AF_INET6, inet_pton
|
||||
from util import reassemble4
|
||||
|
||||
from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect
|
||||
from template_ipsec import IPsecIPv4Params, IPsecIPv6Params, \
|
||||
mk_scapy_crypt_key, config_tun_params
|
||||
|
||||
""" Test_gso is a subclass of VPPTestCase classes.
|
||||
GSO tests.
|
||||
@ -718,5 +723,247 @@ class TestGSO(VppTestCase):
|
||||
self.vapi.feature_gso_enable_disable(sw_if_index=self.pg0.sw_if_index,
|
||||
enable_disable=0)
|
||||
|
||||
def test_gso_ipsec(self):
|
||||
""" GSO IPSEC test """
|
||||
#
|
||||
# Send jumbo frame with gso enabled only on input interface and
|
||||
# create IPIP tunnel on VPP pg0.
|
||||
#
|
||||
|
||||
#
|
||||
# enable ipip4
|
||||
#
|
||||
self.ipip4.add_vpp_config()
|
||||
self.vapi.feature_gso_enable_disable(
|
||||
sw_if_index=self.ipip4.sw_if_index, enable_disable=1)
|
||||
|
||||
# Add IPv4 routes via tunnel interface
|
||||
self.ip4_via_ip4_tunnel = VppIpRoute(
|
||||
self, "172.16.10.0", 24,
|
||||
[VppRoutePath("0.0.0.0",
|
||||
self.ipip4.sw_if_index,
|
||||
proto=FibPathProto.FIB_PATH_NH_PROTO_IP4)])
|
||||
self.ip4_via_ip4_tunnel.add_vpp_config()
|
||||
|
||||
# IPSec config
|
||||
self.ipv4_params = IPsecIPv4Params()
|
||||
self.encryption_type = ESP
|
||||
config_tun_params(self.ipv4_params, self.encryption_type, self.ipip4)
|
||||
|
||||
self.tun_sa_in_v4 = VppIpsecSA(self, self.ipv4_params.vpp_tun_sa_id,
|
||||
self.ipv4_params.vpp_tun_spi,
|
||||
self.ipv4_params.auth_algo_vpp_id,
|
||||
self.ipv4_params.auth_key,
|
||||
self.ipv4_params.crypt_algo_vpp_id,
|
||||
self.ipv4_params.crypt_key,
|
||||
VppEnum.vl_api_ipsec_proto_t.
|
||||
IPSEC_API_PROTO_ESP)
|
||||
self.tun_sa_in_v4.add_vpp_config()
|
||||
|
||||
self.tun_sa_out_v4 = VppIpsecSA(self, self.ipv4_params.scapy_tun_sa_id,
|
||||
self.ipv4_params.scapy_tun_spi,
|
||||
self.ipv4_params.auth_algo_vpp_id,
|
||||
self.ipv4_params.auth_key,
|
||||
self.ipv4_params.crypt_algo_vpp_id,
|
||||
self.ipv4_params.crypt_key,
|
||||
VppEnum.vl_api_ipsec_proto_t.
|
||||
IPSEC_API_PROTO_ESP)
|
||||
self.tun_sa_out_v4.add_vpp_config()
|
||||
|
||||
self.tun_protect_v4 = VppIpsecTunProtect(self,
|
||||
self.ipip4,
|
||||
self.tun_sa_out_v4,
|
||||
[self.tun_sa_in_v4])
|
||||
|
||||
self.tun_protect_v4.add_vpp_config()
|
||||
|
||||
# Set interface up and enable IP on it
|
||||
self.ipip4.admin_up()
|
||||
self.ipip4.set_unnumbered(self.pg0.sw_if_index)
|
||||
|
||||
#
|
||||
# IPv4/IPv4 - IPSEC
|
||||
#
|
||||
ipsec44 = (Ether(src=self.pg2.remote_mac, dst="02:fe:60:1e:a2:79") /
|
||||
IP(src=self.pg2.remote_ip4, dst="172.16.10.3", flags='DF') /
|
||||
TCP(sport=1234, dport=1234) /
|
||||
Raw(b'\xa5' * 65200))
|
||||
|
||||
rxs = self.send_and_expect(self.pg2, [ipsec44], self.pg0, 45)
|
||||
size = 0
|
||||
for rx in rxs:
|
||||
self.assertEqual(rx[Ether].src, self.pg0.local_mac)
|
||||
self.assertEqual(rx[Ether].dst, self.pg0.remote_mac)
|
||||
self.assertEqual(rx[IP].src, self.pg0.local_ip4)
|
||||
self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
|
||||
self.assertEqual(rx[IP].proto, 50) # ESP
|
||||
self.assertEqual(rx[ESP].spi, self.ipv4_params.scapy_tun_spi)
|
||||
inner = self.ipv4_params.vpp_tun_sa.decrypt(rx[IP])
|
||||
self.assertEqual(inner[IP].src, self.pg2.remote_ip4)
|
||||
self.assertEqual(inner[IP].dst, "172.16.10.3")
|
||||
size += inner[IP].len - 20 - 20
|
||||
self.assertEqual(size, 65200)
|
||||
|
||||
self.ip6_via_ip4_tunnel = VppIpRoute(
|
||||
self, "fd01:10::", 64,
|
||||
[VppRoutePath("::",
|
||||
self.ipip4.sw_if_index,
|
||||
proto=FibPathProto.FIB_PATH_NH_PROTO_IP6)])
|
||||
self.ip6_via_ip4_tunnel.add_vpp_config()
|
||||
#
|
||||
# IPv4/IPv6 - IPSEC
|
||||
#
|
||||
ipsec46 = (Ether(src=self.pg2.remote_mac, dst="02:fe:60:1e:a2:79") /
|
||||
IPv6(src=self.pg2.remote_ip6, dst="fd01:10::3") /
|
||||
TCP(sport=1234, dport=1234) /
|
||||
Raw(b'\xa5' * 65200))
|
||||
|
||||
rxs = self.send_and_expect(self.pg2, [ipsec46], self.pg0, 45)
|
||||
size = 0
|
||||
for rx in rxs:
|
||||
self.assertEqual(rx[Ether].src, self.pg0.local_mac)
|
||||
self.assertEqual(rx[Ether].dst, self.pg0.remote_mac)
|
||||
self.assertEqual(rx[IP].src, self.pg0.local_ip4)
|
||||
self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
|
||||
self.assertEqual(rx[IP].proto, 50) # ESP
|
||||
self.assertEqual(rx[ESP].spi, self.ipv4_params.scapy_tun_spi)
|
||||
inner = self.ipv4_params.vpp_tun_sa.decrypt(rx[IP])
|
||||
self.assertEqual(inner[IPv6].src, self.pg2.remote_ip6)
|
||||
self.assertEqual(inner[IPv6].dst, "fd01:10::3")
|
||||
size += inner[IPv6].plen - 20
|
||||
self.assertEqual(size, 65200)
|
||||
|
||||
# disable IPSec
|
||||
self.tun_protect_v4.remove_vpp_config()
|
||||
self.tun_sa_in_v4.remove_vpp_config()
|
||||
self.tun_sa_out_v4.remove_vpp_config()
|
||||
|
||||
#
|
||||
# disable ipip4
|
||||
#
|
||||
self.vapi.feature_gso_enable_disable(self.ipip4.sw_if_index,
|
||||
enable_disable=0)
|
||||
self.ip4_via_ip4_tunnel.remove_vpp_config()
|
||||
self.ip6_via_ip4_tunnel.remove_vpp_config()
|
||||
self.ipip4.remove_vpp_config()
|
||||
|
||||
#
|
||||
# enable ipip6
|
||||
#
|
||||
self.ipip6.add_vpp_config()
|
||||
self.vapi.feature_gso_enable_disable(self.ipip6.sw_if_index,
|
||||
enable_disable=1)
|
||||
|
||||
# Set interface up and enable IP on it
|
||||
self.ipip6.admin_up()
|
||||
self.ipip6.set_unnumbered(self.pg0.sw_if_index)
|
||||
|
||||
# Add IPv4 routes via tunnel interface
|
||||
self.ip4_via_ip6_tunnel = VppIpRoute(
|
||||
self, "172.16.10.0", 24,
|
||||
[VppRoutePath("0.0.0.0",
|
||||
self.ipip6.sw_if_index,
|
||||
proto=FibPathProto.FIB_PATH_NH_PROTO_IP4)])
|
||||
self.ip4_via_ip6_tunnel.add_vpp_config()
|
||||
|
||||
# IPSec config
|
||||
self.ipv6_params = IPsecIPv6Params()
|
||||
self.encryption_type = ESP
|
||||
config_tun_params(self.ipv6_params, self.encryption_type, self.ipip6)
|
||||
self.tun_sa_in_v6 = VppIpsecSA(self, self.ipv6_params.vpp_tun_sa_id,
|
||||
self.ipv6_params.vpp_tun_spi,
|
||||
self.ipv6_params.auth_algo_vpp_id,
|
||||
self.ipv6_params.auth_key,
|
||||
self.ipv6_params.crypt_algo_vpp_id,
|
||||
self.ipv6_params.crypt_key,
|
||||
VppEnum.vl_api_ipsec_proto_t.
|
||||
IPSEC_API_PROTO_ESP)
|
||||
self.tun_sa_in_v6.add_vpp_config()
|
||||
|
||||
self.tun_sa_out_v6 = VppIpsecSA(self, self.ipv6_params.scapy_tun_sa_id,
|
||||
self.ipv6_params.scapy_tun_spi,
|
||||
self.ipv6_params.auth_algo_vpp_id,
|
||||
self.ipv6_params.auth_key,
|
||||
self.ipv6_params.crypt_algo_vpp_id,
|
||||
self.ipv6_params.crypt_key,
|
||||
VppEnum.vl_api_ipsec_proto_t.
|
||||
IPSEC_API_PROTO_ESP)
|
||||
self.tun_sa_out_v6.add_vpp_config()
|
||||
|
||||
self.tun_protect_v6 = VppIpsecTunProtect(self,
|
||||
self.ipip6,
|
||||
self.tun_sa_out_v6,
|
||||
[self.tun_sa_in_v6])
|
||||
|
||||
self.tun_protect_v6.add_vpp_config()
|
||||
|
||||
#
|
||||
# IPv6/IPv4 - IPSEC
|
||||
#
|
||||
ipsec64 = (Ether(src=self.pg2.remote_mac, dst="02:fe:60:1e:a2:79") /
|
||||
IP(src=self.pg2.remote_ip4, dst="172.16.10.3", flags='DF') /
|
||||
TCP(sport=1234, dport=1234) /
|
||||
Raw(b'\xa5' * 65200))
|
||||
|
||||
rxs = self.send_and_expect(self.pg2, [ipsec64], self.pg0, 45)
|
||||
size = 0
|
||||
for rx in rxs:
|
||||
self.assertEqual(rx[Ether].src, self.pg0.local_mac)
|
||||
self.assertEqual(rx[Ether].dst, self.pg0.remote_mac)
|
||||
self.assertEqual(rx[IPv6].src, self.pg0.local_ip6)
|
||||
self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
|
||||
self.assertEqual(ipv6nh[rx[IPv6].nh], "ESP Header")
|
||||
self.assertEqual(rx[ESP].spi, self.ipv6_params.scapy_tun_spi)
|
||||
inner = self.ipv6_params.vpp_tun_sa.decrypt(rx[IPv6])
|
||||
self.assertEqual(inner[IP].src, self.pg2.remote_ip4)
|
||||
self.assertEqual(inner[IP].dst, "172.16.10.3")
|
||||
size += inner[IP].len - 20 - 20
|
||||
self.assertEqual(size, 65200)
|
||||
|
||||
self.ip6_via_ip6_tunnel = VppIpRoute(
|
||||
self, "fd01:10::", 64,
|
||||
[VppRoutePath("::",
|
||||
self.ipip6.sw_if_index,
|
||||
proto=FibPathProto.FIB_PATH_NH_PROTO_IP6)])
|
||||
self.ip6_via_ip6_tunnel.add_vpp_config()
|
||||
|
||||
#
|
||||
# IPv6/IPv6 - IPSEC
|
||||
#
|
||||
ipsec66 = (Ether(src=self.pg2.remote_mac, dst="02:fe:60:1e:a2:79") /
|
||||
IPv6(src=self.pg2.remote_ip6, dst="fd01:10::3") /
|
||||
TCP(sport=1234, dport=1234) /
|
||||
Raw(b'\xa5' * 65200))
|
||||
|
||||
rxs = self.send_and_expect(self.pg2, [ipsec66], self.pg0, 45)
|
||||
size = 0
|
||||
for rx in rxs:
|
||||
self.assertEqual(rx[Ether].src, self.pg0.local_mac)
|
||||
self.assertEqual(rx[Ether].dst, self.pg0.remote_mac)
|
||||
self.assertEqual(rx[IPv6].src, self.pg0.local_ip6)
|
||||
self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
|
||||
self.assertEqual(ipv6nh[rx[IPv6].nh], "ESP Header")
|
||||
self.assertEqual(rx[ESP].spi, self.ipv6_params.scapy_tun_spi)
|
||||
inner = self.ipv6_params.vpp_tun_sa.decrypt(rx[IPv6])
|
||||
self.assertEqual(inner[IPv6].src, self.pg2.remote_ip6)
|
||||
self.assertEqual(inner[IPv6].dst, "fd01:10::3")
|
||||
size += inner[IPv6].plen - 20
|
||||
self.assertEqual(size, 65200)
|
||||
|
||||
# disable IPSec
|
||||
self.tun_protect_v6.remove_vpp_config()
|
||||
self.tun_sa_in_v6.remove_vpp_config()
|
||||
self.tun_sa_out_v6.remove_vpp_config()
|
||||
|
||||
#
|
||||
# disable ipip6
|
||||
#
|
||||
self.ip4_via_ip6_tunnel.remove_vpp_config()
|
||||
self.ip6_via_ip6_tunnel.remove_vpp_config()
|
||||
self.ipip6.remove_vpp_config()
|
||||
|
||||
self.vapi.feature_gso_enable_disable(self.pg0.sw_if_index,
|
||||
enable_disable=0)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main(testRunner=VppTestRunner)
|
||||
|
Reference in New Issue
Block a user