f0a126a1eb
Type: improvement Signed-off-by: Mohsin Kazmi <sykazmi@cisco.com> Change-Id: Ief1e97d03b7a934547add35ac3ed1f93f2499a20
1071 lines
39 KiB
Python
1071 lines
39 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import unittest
|
|
|
|
from framework import VppTestCase
|
|
from asfframework import VppTestRunner
|
|
from vpp_ip import INVALID_INDEX
|
|
from itertools import product
|
|
from config import config
|
|
|
|
from scapy.packet import Raw
|
|
from scapy.layers.l2 import Ether
|
|
from scapy.layers.inet import IP, UDP, TCP, ICMP
|
|
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
|
|
from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
|
|
from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
|
|
from scapy.layers.inet6 import ICMPv6TimeExceeded
|
|
|
|
from ipaddress import ip_network
|
|
|
|
from vpp_object import VppObject
|
|
from vpp_papi import VppEnum
|
|
|
|
N_PKTS = 15
|
|
N_REMOTE_HOSTS = 3
|
|
|
|
SRC = 0
|
|
DST = 1
|
|
|
|
|
|
class CnatCommonTestCase(VppTestCase):
|
|
"""CNat common test class"""
|
|
|
|
#
|
|
# turn the scanner off whilst testing otherwise sessions
|
|
# will time out
|
|
#
|
|
extra_vpp_config = [
|
|
"cnat",
|
|
"{",
|
|
"session-db-buckets",
|
|
"64",
|
|
"session-cleanup-timeout",
|
|
"0.1",
|
|
"session-max-age",
|
|
"1",
|
|
"tcp-max-age",
|
|
"1",
|
|
"scanner",
|
|
"off",
|
|
"}",
|
|
]
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(CnatCommonTestCase, cls).setUpClass()
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
super(CnatCommonTestCase, cls).tearDownClass()
|
|
|
|
|
|
class Endpoint(object):
|
|
"""CNat endpoint"""
|
|
|
|
def __init__(self, pg=None, pgi=None, port=0, is_v6=False, ip=None):
|
|
self.port = port
|
|
self.is_v6 = is_v6
|
|
self.sw_if_index = INVALID_INDEX
|
|
if pg is not None and pgi is not None:
|
|
# pg interface specified and remote index
|
|
self.ip = self.get_ip46(pg.remote_hosts[pgi])
|
|
elif pg is not None:
|
|
self.ip = None
|
|
self.sw_if_index = pg.sw_if_index
|
|
elif ip is not None:
|
|
self.ip = ip
|
|
else:
|
|
self.ip = "::" if self.is_v6 else "0.0.0.0"
|
|
|
|
def get_ip46(self, obj):
|
|
if self.is_v6:
|
|
return obj.ip6
|
|
return obj.ip4
|
|
|
|
def udpate(self, **kwargs):
|
|
self.__init__(**kwargs)
|
|
|
|
def _vpp_if_af(self):
|
|
if self.is_v6:
|
|
return VppEnum.vl_api_address_family_t.ADDRESS_IP6
|
|
return VppEnum.vl_api_address_family_t.ADDRESS_IP4
|
|
|
|
def encode(self):
|
|
return {
|
|
"addr": self.ip,
|
|
"port": self.port,
|
|
"sw_if_index": self.sw_if_index,
|
|
"if_af": self._vpp_if_af(),
|
|
}
|
|
|
|
def __str__(self):
|
|
return "%s:%d" % (self.ip, self.port)
|
|
|
|
|
|
class Translation(VppObject):
|
|
def __init__(self, test, iproto, vip, paths, fhc):
|
|
self._test = test
|
|
self.vip = vip
|
|
self.iproto = iproto
|
|
self.paths = paths
|
|
self.fhc = fhc
|
|
self.id = None
|
|
|
|
def __str__(self):
|
|
return "%s %s %s" % (self.vip, self.iproto, self.paths)
|
|
|
|
def _vl4_proto(self):
|
|
ip_proto = VppEnum.vl_api_ip_proto_t
|
|
return {
|
|
UDP: ip_proto.IP_API_PROTO_UDP,
|
|
TCP: ip_proto.IP_API_PROTO_TCP,
|
|
}[self.iproto]
|
|
|
|
def _encoded_paths(self):
|
|
return [
|
|
{"src_ep": src.encode(), "dst_ep": dst.encode()}
|
|
for (src, dst) in self.paths
|
|
]
|
|
|
|
def add_vpp_config(self):
|
|
r = self._test.vapi.cnat_translation_update(
|
|
{
|
|
"vip": self.vip.encode(),
|
|
"ip_proto": self._vl4_proto(),
|
|
"n_paths": len(self.paths),
|
|
"paths": self._encoded_paths(),
|
|
"flow_hash_config": self.fhc,
|
|
}
|
|
)
|
|
self._test.registry.register(self, self._test.logger)
|
|
self.id = r.id
|
|
return self
|
|
|
|
def remove_vpp_config(self):
|
|
assert self.id is not None
|
|
self._test.vapi.cnat_translation_del(id=self.id)
|
|
return self
|
|
|
|
def query_vpp_config(self):
|
|
for t in self._test.vapi.cnat_translation_dump():
|
|
if self.id == t.translation.id:
|
|
return t.translation
|
|
return None
|
|
|
|
|
|
class CnatTestContext(object):
|
|
"""
|
|
Usage :
|
|
|
|
ctx = CnatTestContext(self, TCP, is_v6=True)
|
|
|
|
# send pg0.remote[0]:1234 -> pg1.remote[0]:6661
|
|
ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
|
|
|
|
# We expect this to be NATed as
|
|
# pg2.remote[0]:<anyport> -> pg1.remote[0]:6661
|
|
ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
|
|
|
|
# After running cnat_expect, we can send back the received packet
|
|
# and expect it be 'unnated' so that we get the original packet
|
|
ctx.cnat_send_return().cnat_expect_return()
|
|
|
|
# same thing for ICMP errors
|
|
ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
|
|
"""
|
|
|
|
def __init__(self, test, L4PROTO, is_v6):
|
|
self.L4PROTO = L4PROTO
|
|
self.is_v6 = is_v6
|
|
self._test = test
|
|
|
|
def get_ip46(self, obj):
|
|
if self.is_v6:
|
|
return obj.ip6
|
|
return obj.ip4
|
|
|
|
@property
|
|
def IP46(self):
|
|
return IPv6 if self.is_v6 else IP
|
|
|
|
def cnat_send(
|
|
self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port, no_replies=False
|
|
):
|
|
if isinstance(src_id, int):
|
|
self.src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
|
|
else:
|
|
self.dst_addr = src_id
|
|
if isinstance(dst_id, int):
|
|
self.dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
|
|
else:
|
|
self.dst_addr = dst_id
|
|
self.src_port = src_port # also ICMP id
|
|
self.dst_port = dst_port # also ICMP type
|
|
|
|
if self.L4PROTO in [TCP, UDP]:
|
|
l4 = self.L4PROTO(sport=self.src_port, dport=self.dst_port)
|
|
elif self.L4PROTO in [ICMP] and not self.is_v6:
|
|
l4 = self.L4PROTO(id=self.src_port, type=self.dst_port)
|
|
elif self.L4PROTO in [ICMP] and self.is_v6:
|
|
l4 = ICMPv6EchoRequest(id=self.src_port)
|
|
p1 = (
|
|
Ether(src=src_pg.remote_mac, dst=src_pg.local_mac)
|
|
/ self.IP46(src=self.src_addr, dst=self.dst_addr)
|
|
/ l4
|
|
/ Raw()
|
|
)
|
|
|
|
if no_replies:
|
|
self._test.send_and_assert_no_replies(src_pg, p1 * N_PKTS, dst_pg)
|
|
else:
|
|
self.rxs = self._test.send_and_expect(src_pg, p1 * N_PKTS, dst_pg)
|
|
self.expected_src_pg = src_pg
|
|
self.expected_dst_pg = dst_pg
|
|
return self
|
|
|
|
def cnat_expect(self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port):
|
|
if isinstance(src_id, int):
|
|
self.expect_src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
|
|
else:
|
|
self.expect_src_addr = src_id
|
|
if isinstance(dst_id, int):
|
|
self.expect_dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
|
|
else:
|
|
self.expect_dst_addr = dst_id
|
|
self.expect_src_port = src_port
|
|
self.expect_dst_port = dst_port
|
|
|
|
if self.expect_src_port is None:
|
|
if self.L4PROTO in [TCP, UDP]:
|
|
self.expect_src_port = self.rxs[0][self.L4PROTO].sport
|
|
elif self.L4PROTO in [ICMP] and not self.is_v6:
|
|
self.expect_src_port = self.rxs[0][self.L4PROTO].id
|
|
elif self.L4PROTO in [ICMP] and self.is_v6:
|
|
self.expect_src_port = self.rxs[0][ICMPv6EchoRequest].id
|
|
|
|
for rx in self.rxs:
|
|
self._test.assert_packet_checksums_valid(rx)
|
|
self._test.assertEqual(rx[self.IP46].dst, self.expect_dst_addr)
|
|
self._test.assertEqual(rx[self.IP46].src, self.expect_src_addr)
|
|
if self.L4PROTO in [TCP, UDP]:
|
|
self._test.assertEqual(rx[self.L4PROTO].dport, self.expect_dst_port)
|
|
self._test.assertEqual(rx[self.L4PROTO].sport, self.expect_src_port)
|
|
elif self.L4PROTO in [ICMP] and not self.is_v6:
|
|
self._test.assertEqual(rx[self.L4PROTO].type, self.expect_dst_port)
|
|
self._test.assertEqual(rx[self.L4PROTO].id, self.expect_src_port)
|
|
elif self.L4PROTO in [ICMP] and self.is_v6:
|
|
self._test.assertEqual(rx[ICMPv6EchoRequest].id, self.expect_src_port)
|
|
return self
|
|
|
|
def cnat_send_return(self):
|
|
"""This sends the return traffic"""
|
|
if self.L4PROTO in [TCP, UDP]:
|
|
l4 = self.L4PROTO(sport=self.expect_dst_port, dport=self.expect_src_port)
|
|
elif self.L4PROTO in [ICMP] and not self.is_v6:
|
|
# icmp type 0 if echo reply
|
|
l4 = self.L4PROTO(id=self.expect_src_port, type=0)
|
|
elif self.L4PROTO in [ICMP] and self.is_v6:
|
|
l4 = ICMPv6EchoReply(id=self.expect_src_port)
|
|
src_mac = self.expected_dst_pg.remote_mac
|
|
p1 = (
|
|
Ether(src=src_mac, dst=self.expected_dst_pg.local_mac)
|
|
/ self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr)
|
|
/ l4
|
|
/ Raw()
|
|
)
|
|
|
|
self.return_rxs = self._test.send_and_expect(
|
|
self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg
|
|
)
|
|
return self
|
|
|
|
def cnat_expect_return(self):
|
|
for rx in self.return_rxs:
|
|
self._test.assert_packet_checksums_valid(rx)
|
|
self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
|
|
self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
|
|
if self.L4PROTO in [TCP, UDP]:
|
|
self._test.assertEqual(rx[self.L4PROTO].dport, self.src_port)
|
|
self._test.assertEqual(rx[self.L4PROTO].sport, self.dst_port)
|
|
elif self.L4PROTO in [ICMP] and not self.is_v6:
|
|
# icmp type 0 if echo reply
|
|
self._test.assertEqual(rx[self.L4PROTO].type, 0)
|
|
self._test.assertEqual(rx[self.L4PROTO].id, self.src_port)
|
|
elif self.L4PROTO in [ICMP] and self.is_v6:
|
|
self._test.assertEqual(rx[ICMPv6EchoReply].id, self.src_port)
|
|
return self
|
|
|
|
def cnat_send_icmp_return_error(self):
|
|
"""
|
|
This called after cnat_expect will send an icmp error
|
|
on the reverse path
|
|
"""
|
|
ICMPelem = ICMPv6DestUnreach(code=1) if self.is_v6 else ICMP(type=11)
|
|
InnerIP = self.rxs[0][self.IP46]
|
|
p1 = (
|
|
Ether(
|
|
src=self.expected_dst_pg.remote_mac, dst=self.expected_dst_pg.local_mac
|
|
)
|
|
/ self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr)
|
|
/ ICMPelem
|
|
/ InnerIP
|
|
)
|
|
self.return_rxs = self._test.send_and_expect(
|
|
self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg
|
|
)
|
|
return self
|
|
|
|
def cnat_expect_icmp_error_return(self):
|
|
ICMP46 = ICMPv6DestUnreach if self.is_v6 else ICMP
|
|
IP46err = IPerror6 if self.is_v6 else IPerror
|
|
L4err = TCPerror if self.L4PROTO is TCP else UDPerror
|
|
for rx in self.return_rxs:
|
|
self._test.assert_packet_checksums_valid(rx)
|
|
self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
|
|
self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
|
|
self._test.assertEqual(rx[ICMP46][IP46err].src, self.src_addr)
|
|
self._test.assertEqual(rx[ICMP46][IP46err].dst, self.dst_addr)
|
|
self._test.assertEqual(rx[ICMP46][IP46err][L4err].sport, self.src_port)
|
|
self._test.assertEqual(rx[ICMP46][IP46err][L4err].dport, self.dst_port)
|
|
return self
|
|
|
|
|
|
# -------------------------------------------------------------------
|
|
# -------------------------------------------------------------------
|
|
# -------------------------------------------------------------------
|
|
# -------------------------------------------------------------------
|
|
|
|
|
|
@unittest.skipIf("cnat" in config.excluded_plugins, "Exclude CNAT plugin tests")
|
|
class TestCNatTranslation(CnatCommonTestCase):
|
|
"""CNat Translation"""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(TestCNatTranslation, cls).setUpClass()
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
super(TestCNatTranslation, cls).tearDownClass()
|
|
|
|
def setUp(self):
|
|
super(TestCNatTranslation, self).setUp()
|
|
|
|
self.create_pg_interfaces(range(3))
|
|
self.pg0.generate_remote_hosts(N_REMOTE_HOSTS)
|
|
self.pg1.generate_remote_hosts(N_REMOTE_HOSTS)
|
|
|
|
for i in self.pg_interfaces:
|
|
i.admin_up()
|
|
i.config_ip4()
|
|
i.resolve_arp()
|
|
i.config_ip6()
|
|
i.resolve_ndp()
|
|
i.configure_ipv4_neighbors()
|
|
i.configure_ipv6_neighbors()
|
|
|
|
def tearDown(self):
|
|
for translation in self.translations:
|
|
translation.remove_vpp_config()
|
|
|
|
self.vapi.cnat_session_purge()
|
|
self.assertFalse(self.vapi.cnat_session_dump())
|
|
|
|
for i in self.pg_interfaces:
|
|
i.unconfig_ip4()
|
|
i.unconfig_ip6()
|
|
i.admin_down()
|
|
super(TestCNatTranslation, self).tearDown()
|
|
|
|
def cnat_fhc_translation(self):
|
|
"""CNat Translation"""
|
|
self.logger.info(self.vapi.cli("sh cnat client"))
|
|
self.logger.info(self.vapi.cli("sh cnat translation"))
|
|
|
|
for nbr, translation in enumerate(self.mbtranslations):
|
|
vip = translation.vip
|
|
|
|
#
|
|
# Flows to the VIP with same ips and different source ports are loadbalanced identically
|
|
# in both cases of flow hash 0x03 (src ip and dst ip) and 0x08 (dst port)
|
|
#
|
|
ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
|
|
for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]):
|
|
# from client to vip
|
|
ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
|
|
dport1 = ctx.rxs[0][ctx.L4PROTO].dport
|
|
ctx._test.assertIn(
|
|
dport1,
|
|
[translation.paths[0][DST].port, translation.paths[1][DST].port],
|
|
)
|
|
ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg1, nbr, dport1)
|
|
|
|
ctx.cnat_send(
|
|
self.pg0, src_pgi, sport + 122, self.pg1, vip.ip, vip.port
|
|
)
|
|
dport2 = ctx.rxs[0][ctx.L4PROTO].dport
|
|
ctx._test.assertIn(
|
|
dport2,
|
|
[translation.paths[0][DST].port, translation.paths[1][DST].port],
|
|
)
|
|
ctx.cnat_expect(self.pg0, src_pgi, sport + 122, self.pg1, nbr, dport2)
|
|
|
|
ctx._test.assertEqual(dport1, dport2)
|
|
|
|
def cnat_translation(self):
|
|
"""CNat Translation"""
|
|
self.logger.info(self.vapi.cli("sh cnat client"))
|
|
self.logger.info(self.vapi.cli("sh cnat translation"))
|
|
|
|
for nbr, translation in enumerate(self.translations):
|
|
vip = translation.vip
|
|
|
|
#
|
|
# Test Flows to the VIP
|
|
#
|
|
ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
|
|
for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]):
|
|
# from client to vip
|
|
ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
|
|
dst_port = translation.paths[0][DST].port
|
|
ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg1, nbr, dst_port)
|
|
# from vip to client
|
|
ctx.cnat_send_return().cnat_expect_return()
|
|
|
|
#
|
|
# packets to the VIP that do not match a
|
|
# translation are dropped
|
|
#
|
|
ctx.cnat_send(
|
|
self.pg0, src_pgi, sport, self.pg1, vip.ip, 6666, no_replies=True
|
|
)
|
|
|
|
#
|
|
# packets from the VIP that do not match a
|
|
# session are forwarded
|
|
#
|
|
ctx.cnat_send(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
|
|
ctx.cnat_expect(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
|
|
|
|
#
|
|
# modify the translation to use a different backend
|
|
#
|
|
old_dst_port = translation.paths[0][DST].port
|
|
translation.paths[0][DST].udpate(
|
|
pg=self.pg2, pgi=0, port=5000, is_v6=vip.is_v6
|
|
)
|
|
translation.add_vpp_config()
|
|
|
|
#
|
|
# existing flows follow the old path
|
|
#
|
|
for src_pgi in range(N_REMOTE_HOSTS):
|
|
for sport in [1234, 1233]:
|
|
# from client to vip
|
|
ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port)
|
|
ctx.cnat_expect(
|
|
self.pg0, src_pgi, sport, self.pg1, nbr, old_dst_port
|
|
)
|
|
# from vip to client
|
|
ctx.cnat_send_return().cnat_expect_return()
|
|
|
|
#
|
|
# new flows go to the new backend
|
|
#
|
|
for src_pgi in range(N_REMOTE_HOSTS):
|
|
ctx.cnat_send(self.pg0, src_pgi, 9999, self.pg2, vip.ip, vip.port)
|
|
ctx.cnat_expect(self.pg0, src_pgi, 9999, self.pg2, 0, 5000)
|
|
|
|
self.logger.info(self.vapi.cli("sh cnat session verbose"))
|
|
|
|
#
|
|
# turn the scanner back on and wait until the sessions
|
|
# all disapper
|
|
#
|
|
self.vapi.cli("test cnat scanner on")
|
|
self.virtual_sleep(2)
|
|
sessions = self.vapi.cnat_session_dump()
|
|
self.assertEqual(len(sessions), 0)
|
|
self.vapi.cli("test cnat scanner off")
|
|
|
|
#
|
|
# load some flows again and purge
|
|
#
|
|
for translation in self.translations:
|
|
vip = translation.vip
|
|
ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
|
|
for src_pgi in range(N_REMOTE_HOSTS):
|
|
for sport in [1234, 1233]:
|
|
# from client to vip
|
|
ctx.cnat_send(self.pg0, src_pgi, sport, self.pg2, vip.ip, vip.port)
|
|
ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg2, 0, 5000)
|
|
|
|
def _test_icmp(self):
|
|
#
|
|
# Testing ICMP
|
|
#
|
|
for nbr, translation in enumerate(self.translations):
|
|
vip = translation.vip
|
|
ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
|
|
|
|
#
|
|
# NATing ICMP errors
|
|
#
|
|
ctx.cnat_send(self.pg0, 0, 1234, self.pg1, vip.ip, vip.port)
|
|
dst_port = translation.paths[0][DST].port
|
|
ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, nbr, dst_port)
|
|
ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
|
|
|
|
#
|
|
# ICMP errors with no VIP associated should not be
|
|
# modified
|
|
#
|
|
ctx.cnat_send(self.pg0, 0, 1234, self.pg2, 0, vip.port)
|
|
dst_port = translation.paths[0][DST].port
|
|
ctx.cnat_expect(self.pg0, 0, 1234, self.pg2, 0, vip.port)
|
|
ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
|
|
|
|
def _make_multi_backend_translations(self):
|
|
self.translations = []
|
|
self.mbtranslations = []
|
|
self.mbtranslations.append(
|
|
Translation(
|
|
self,
|
|
TCP,
|
|
Endpoint(ip="30.0.0.5", port=5555, is_v6=False),
|
|
[
|
|
(
|
|
Endpoint(is_v6=False),
|
|
Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False),
|
|
),
|
|
(
|
|
Endpoint(is_v6=False),
|
|
Endpoint(pg=self.pg1, pgi=0, port=4005, is_v6=False),
|
|
),
|
|
],
|
|
0x03, # hash only on dst ip and src ip
|
|
).add_vpp_config()
|
|
)
|
|
self.mbtranslations.append(
|
|
Translation(
|
|
self,
|
|
TCP,
|
|
Endpoint(ip="30.0.0.6", port=5555, is_v6=False),
|
|
[
|
|
(
|
|
Endpoint(is_v6=False),
|
|
Endpoint(pg=self.pg1, pgi=1, port=4006, is_v6=False),
|
|
),
|
|
(
|
|
Endpoint(is_v6=False),
|
|
Endpoint(pg=self.pg1, pgi=1, port=4007, is_v6=False),
|
|
),
|
|
],
|
|
0x08, # hash only on dst port
|
|
).add_vpp_config()
|
|
)
|
|
|
|
def _make_translations_v4(self):
|
|
self.translations = []
|
|
self.translations.append(
|
|
Translation(
|
|
self,
|
|
TCP,
|
|
Endpoint(ip="30.0.0.1", port=5555, is_v6=False),
|
|
[
|
|
(
|
|
Endpoint(is_v6=False),
|
|
Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False),
|
|
)
|
|
],
|
|
0x9F,
|
|
).add_vpp_config()
|
|
)
|
|
self.translations.append(
|
|
Translation(
|
|
self,
|
|
TCP,
|
|
Endpoint(ip="30.0.0.2", port=5554, is_v6=False),
|
|
[
|
|
(
|
|
Endpoint(is_v6=False),
|
|
Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=False),
|
|
)
|
|
],
|
|
0x9F,
|
|
).add_vpp_config()
|
|
)
|
|
self.translations.append(
|
|
Translation(
|
|
self,
|
|
UDP,
|
|
Endpoint(ip="30.0.0.2", port=5553, is_v6=False),
|
|
[
|
|
(
|
|
Endpoint(is_v6=False),
|
|
Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=False),
|
|
)
|
|
],
|
|
0x9F,
|
|
).add_vpp_config()
|
|
)
|
|
|
|
def _make_translations_v6(self):
|
|
self.translations = []
|
|
self.translations.append(
|
|
Translation(
|
|
self,
|
|
TCP,
|
|
Endpoint(ip="30::1", port=5555, is_v6=True),
|
|
[
|
|
(
|
|
Endpoint(is_v6=True),
|
|
Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=True),
|
|
)
|
|
],
|
|
0x9F,
|
|
).add_vpp_config()
|
|
)
|
|
self.translations.append(
|
|
Translation(
|
|
self,
|
|
TCP,
|
|
Endpoint(ip="30::2", port=5554, is_v6=True),
|
|
[
|
|
(
|
|
Endpoint(is_v6=True),
|
|
Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=True),
|
|
)
|
|
],
|
|
0x9F,
|
|
).add_vpp_config()
|
|
)
|
|
self.translations.append(
|
|
Translation(
|
|
self,
|
|
UDP,
|
|
Endpoint(ip="30::2", port=5553, is_v6=True),
|
|
[
|
|
(
|
|
Endpoint(is_v6=True),
|
|
Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=True),
|
|
)
|
|
],
|
|
0x9F,
|
|
).add_vpp_config()
|
|
)
|
|
|
|
def test_icmp4(self):
|
|
# """ CNat Translation icmp v4 """
|
|
self._make_translations_v4()
|
|
self._test_icmp()
|
|
|
|
def test_icmp6(self):
|
|
# """ CNat Translation icmp v6 """
|
|
self._make_translations_v6()
|
|
self._test_icmp()
|
|
|
|
def test_cnat6(self):
|
|
# """ CNat Translation ipv6 """
|
|
self._make_translations_v6()
|
|
self.cnat_translation()
|
|
|
|
def test_cnat4(self):
|
|
# """ CNat Translation ipv4 """
|
|
self._make_translations_v4()
|
|
self.cnat_translation()
|
|
|
|
def test_cnat_fhc(self):
|
|
# """ CNat Translation flow hash config """
|
|
self._make_multi_backend_translations()
|
|
self.cnat_fhc_translation()
|
|
|
|
|
|
@unittest.skipIf("cnat" in config.excluded_plugins, "Exclude CNAT plugin tests")
|
|
class TestCNatSourceNAT(CnatCommonTestCase):
|
|
"""CNat Source NAT"""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(TestCNatSourceNAT, cls).setUpClass()
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
super(TestCNatSourceNAT, cls).tearDownClass()
|
|
|
|
def _enable_disable_snat(self, is_enable=True):
|
|
self.vapi.cnat_set_snat_addresses(
|
|
snat_ip4=self.pg2.remote_hosts[0].ip4,
|
|
snat_ip6=self.pg2.remote_hosts[0].ip6,
|
|
sw_if_index=INVALID_INDEX,
|
|
)
|
|
self.vapi.feature_enable_disable(
|
|
enable=1 if is_enable else 0,
|
|
arc_name="ip6-unicast",
|
|
feature_name="cnat-snat-ip6",
|
|
sw_if_index=self.pg0.sw_if_index,
|
|
)
|
|
self.vapi.feature_enable_disable(
|
|
enable=1 if is_enable else 0,
|
|
arc_name="ip4-unicast",
|
|
feature_name="cnat-snat-ip4",
|
|
sw_if_index=self.pg0.sw_if_index,
|
|
)
|
|
|
|
policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t
|
|
self.vapi.cnat_set_snat_policy(
|
|
policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX
|
|
)
|
|
for i in self.pg_interfaces:
|
|
self.vapi.cnat_snat_policy_add_del_if(
|
|
sw_if_index=i.sw_if_index,
|
|
is_add=1 if is_enable else 0,
|
|
table=policie_tbls.CNAT_POLICY_INCLUDE_V6,
|
|
)
|
|
self.vapi.cnat_snat_policy_add_del_if(
|
|
sw_if_index=i.sw_if_index,
|
|
is_add=1 if is_enable else 0,
|
|
table=policie_tbls.CNAT_POLICY_INCLUDE_V4,
|
|
)
|
|
|
|
def setUp(self):
|
|
super(TestCNatSourceNAT, self).setUp()
|
|
|
|
self.create_pg_interfaces(range(3))
|
|
self.pg1.generate_remote_hosts(2)
|
|
|
|
for i in self.pg_interfaces:
|
|
i.admin_up()
|
|
i.config_ip4()
|
|
i.resolve_arp()
|
|
i.config_ip6()
|
|
i.resolve_ndp()
|
|
i.configure_ipv6_neighbors()
|
|
i.configure_ipv4_neighbors()
|
|
|
|
self._enable_disable_snat(is_enable=True)
|
|
|
|
def tearDown(self):
|
|
self._enable_disable_snat(is_enable=True)
|
|
|
|
self.vapi.cnat_session_purge()
|
|
for i in self.pg_interfaces:
|
|
i.unconfig_ip4()
|
|
i.unconfig_ip6()
|
|
i.admin_down()
|
|
super(TestCNatSourceNAT, self).tearDown()
|
|
|
|
def test_snat_v6(self):
|
|
# """ CNat Source Nat v6 """
|
|
self.sourcenat_test_tcp_udp_conf(TCP, is_v6=True)
|
|
self.sourcenat_test_tcp_udp_conf(UDP, is_v6=True)
|
|
self.sourcenat_test_icmp_echo_conf(is_v6=True)
|
|
self.sourcenat_test_icmp_traceroute_conf(is_v6=True)
|
|
|
|
def test_snat_v4(self):
|
|
# """ CNat Source Nat v4 """
|
|
self.sourcenat_test_tcp_udp_conf(TCP)
|
|
self.sourcenat_test_tcp_udp_conf(UDP)
|
|
self.sourcenat_test_icmp_echo_conf()
|
|
self.sourcenat_test_icmp_traceroute_conf()
|
|
|
|
def sourcenat_test_icmp_echo_conf(self, is_v6=False):
|
|
ctx = CnatTestContext(self, ICMP, is_v6=is_v6)
|
|
# 8 is ICMP type echo (v4 only)
|
|
ctx.cnat_send(self.pg0, 0, 0xFEED, self.pg1, 0, 8)
|
|
ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 8)
|
|
ctx.cnat_send_return().cnat_expect_return()
|
|
|
|
def sourcenat_test_icmp_traceroute_conf(self, is_v6=False):
|
|
# IPv4 ICMP
|
|
if not is_v6:
|
|
# Create an ICMP traceroute packet with TTL set to 1.
|
|
# The CNAT translates the packet, but the NATted packet is dropped
|
|
# due to the TTL of 1. An ICMP Time Exceeded message is sent
|
|
# to the source (which is the NATted address).
|
|
# The packet will be translated once more to the original
|
|
# source IP address.
|
|
|
|
icmp = (
|
|
Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
|
|
/ IP(
|
|
ttl=1,
|
|
src=self.pg0.remote_hosts[0].ip4,
|
|
dst=self.pg1.remote_hosts[0].ip4,
|
|
)
|
|
/ ICMP(id=0xFEED, type=8) # ICMP Type Echo Request
|
|
/ Raw()
|
|
)
|
|
|
|
self.rxs = self.send_and_expect(self.pg0, icmp, self.pg0)
|
|
|
|
for rx in self.rxs:
|
|
self.assert_packet_checksums_valid(rx)
|
|
self.assertEqual(rx[IP].dst, self.pg0.remote_hosts[0].ip4)
|
|
self.assertEqual(rx[IP].src, "172.16.1.1")
|
|
self.assertEqual(rx[ICMP].type, 11) # ICMP Type 11 (Time Exceeded)
|
|
self.assertEqual(
|
|
rx[ICMP].code, 0
|
|
) # ICMP Code 0 (TTL Zero During Transit)
|
|
inner = rx[ICMP].payload
|
|
self.assertEqual(inner[IPerror].src, self.pg0.remote_hosts[0].ip4)
|
|
self.assertEqual(inner[IPerror].dst, self.pg1.remote_hosts[0].ip4)
|
|
self.assertEqual(inner[ICMPerror].type, 8) # ICMP Echo Request
|
|
self.assertEqual(inner[ICMPerror].id, 0xFEED)
|
|
|
|
# source ---> NATted Transit ---> Transit 2 ... ---> Transit N ---> Destination
|
|
# Simulate an ICMP Time Exceeded message arriving at the NATted Transit
|
|
# from the Transit N-2 node. This occurs because the NATted packet
|
|
# is dropped due to a TTL of 1.
|
|
# An ICMP Time Exceeded message is sent back to the source
|
|
# (initially the NATted address). The CNAT then translates the message
|
|
# back to the original source IP address.
|
|
|
|
# For ICMP based traffic, snat session uses identifier for session key.
|
|
# snat allocates a new identifier. To hit the snat session from Transit N-2
|
|
# to NATed Transit, packet should use snat allocated identifier. To get the
|
|
# snat allocated identifier, echo request will be sent and captured at the
|
|
# destination, taken out the identifier from the packet and use it to set
|
|
# the identifier in the ICMP time exceed packet
|
|
icmp[IP].ttl = 64
|
|
rxs = self.send_and_expect(self.pg0, icmp, self.pg1)
|
|
|
|
icmp_error = (
|
|
Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
|
|
/ IP(src="172.16.1.1", dst=self.pg2.remote_hosts[0].ip4)
|
|
/ ICMP(type=11, code=0)
|
|
/ IPerror(
|
|
src=self.pg2.remote_hosts[0].ip4, dst=self.pg1.remote_hosts[0].ip4
|
|
)
|
|
/ ICMPerror(id=rxs[0][ICMP].id, type=8)
|
|
/ Raw()
|
|
)
|
|
|
|
self.rxs = self.send_and_expect(self.pg1, icmp_error, self.pg0)
|
|
for rx in self.rxs:
|
|
self.assert_packet_checksums_valid(rx)
|
|
self.assertEqual(rx[IP].dst, self.pg0.remote_hosts[0].ip4)
|
|
self.assertEqual(rx[IP].src, "172.16.1.1")
|
|
self.assertEqual(rx[ICMP].type, 11) # ICMP Type 11 (Time Exceeded)
|
|
self.assertEqual(
|
|
rx[ICMP].code, 0
|
|
) # ICMP Code 0 (TTL Zero During Transit)
|
|
inner = rx[ICMP].payload
|
|
self.assertEqual(inner[IPerror].src, self.pg0.remote_hosts[0].ip4)
|
|
self.assertEqual(inner[IPerror].dst, self.pg1.remote_hosts[0].ip4)
|
|
self.assertEqual(inner[ICMPerror].type, 8) # ICMP Echo Request
|
|
self.assertEqual(inner[ICMPerror].id, 0xFEED)
|
|
|
|
# IPv6 ICMPv6
|
|
if is_v6:
|
|
|
|
# Create an ICMPv6 traceroute packet with Hop Limit set to 1.
|
|
# The CNAT translates the packet, but the NATted packet is dropped
|
|
# due to the Hop Limit of 1. An ICMPv6 Time Exceeded message is sent
|
|
# back to the source (which is the NATted address).
|
|
# The CNAT translates the message once more to restore
|
|
# the original source IPv6 address.
|
|
icmp6 = (
|
|
Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
|
|
/ IPv6(
|
|
hlim=1,
|
|
src=self.pg0.remote_hosts[0].ip6,
|
|
dst=self.pg1.remote_hosts[0].ip6,
|
|
)
|
|
/ ICMPv6EchoRequest(id=0xFEED)
|
|
/ Raw()
|
|
)
|
|
self.rxs = self.send_and_expect(self.pg0, icmp6, self.pg0)
|
|
|
|
for rx in self.rxs:
|
|
self.assert_packet_checksums_valid(rx)
|
|
self.assertEqual(rx[IPv6].dst, self.pg0.remote_hosts[0].ip6)
|
|
self.assertEqual(rx[IPv6].src, "fd01:1::1")
|
|
self.assertEqual(
|
|
rx[ICMPv6TimeExceeded].type, 3
|
|
) # ICMPv6 Type 3 (Time Exceeded)
|
|
self.assertEqual(
|
|
rx[ICMPv6TimeExceeded].code, 0
|
|
) # ICMPv6 Code 0 (TTL Zero During Transit)
|
|
inner = rx[ICMPv6TimeExceeded].payload
|
|
self.assertEqual(inner[IPerror6].src, self.pg0.remote_hosts[0].ip6)
|
|
self.assertEqual(inner[IPerror6].dst, self.pg1.remote_hosts[0].ip6)
|
|
self.assertEqual(
|
|
inner[ICMPv6EchoRequest].type, 128
|
|
) # ICMPv6 Echo Request
|
|
self.assertEqual(inner[ICMPv6EchoRequest].id, 0xFEED)
|
|
|
|
def sourcenat_test_tcp_udp_conf(self, L4PROTO, is_v6=False):
|
|
ctx = CnatTestContext(self, L4PROTO, is_v6)
|
|
# we should source NAT
|
|
ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
|
|
ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
|
|
ctx.cnat_send_return().cnat_expect_return()
|
|
|
|
# exclude dst address of pg1.1 from snat
|
|
if is_v6:
|
|
exclude_prefix = ip_network(
|
|
"%s/100" % self.pg1.remote_hosts[1].ip6, strict=False
|
|
)
|
|
else:
|
|
exclude_prefix = ip_network(
|
|
"%s/16" % self.pg1.remote_hosts[1].ip4, strict=False
|
|
)
|
|
|
|
# add remote host to exclude list
|
|
self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=1)
|
|
|
|
# We should not source NAT the id=1
|
|
ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
|
|
ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, 1, 6661)
|
|
ctx.cnat_send_return().cnat_expect_return()
|
|
|
|
# But we should source NAT the id=0
|
|
ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
|
|
ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
|
|
ctx.cnat_send_return().cnat_expect_return()
|
|
|
|
# remove remote host from exclude list
|
|
self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=0)
|
|
self.vapi.cnat_session_purge()
|
|
|
|
# We should source NAT again
|
|
ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
|
|
ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
|
|
ctx.cnat_send_return().cnat_expect_return()
|
|
|
|
# test return ICMP error nating
|
|
ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
|
|
ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
|
|
ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
|
|
|
|
self.vapi.cnat_session_purge()
|
|
|
|
|
|
@unittest.skipIf("cnat" in config.excluded_plugins, "Exclude CNAT plugin tests")
|
|
class TestCNatDHCP(CnatCommonTestCase):
|
|
"""CNat DHCP"""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(TestCNatDHCP, cls).setUpClass()
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
super(TestCNatDHCP, cls).tearDownClass()
|
|
|
|
def tearDown(self):
|
|
for i in self.pg_interfaces:
|
|
i.admin_down()
|
|
super(TestCNatDHCP, self).tearDown()
|
|
|
|
def make_addr(self, sw_if_index, addr_id, is_v6):
|
|
if is_v6:
|
|
return "fd01:%x::%u" % (sw_if_index, addr_id + 1)
|
|
return "172.16.%u.%u" % (sw_if_index, addr_id)
|
|
|
|
def make_prefix(self, sw_if_index, addr_id, is_v6):
|
|
if is_v6:
|
|
return "%s/128" % self.make_addr(sw_if_index, addr_id, is_v6)
|
|
return "%s/32" % self.make_addr(sw_if_index, addr_id, is_v6)
|
|
|
|
def check_resolved(self, tr, addr_id, is_v6=False):
|
|
qt = tr.query_vpp_config()
|
|
self.assertEqual(
|
|
str(qt.vip.addr), self.make_addr(tr.vip.sw_if_index, addr_id, is_v6)
|
|
)
|
|
self.assertEqual(len(qt.paths), len(tr.paths))
|
|
for path_tr, path_qt in zip(tr.paths, qt.paths):
|
|
src_qt = path_qt.src_ep
|
|
dst_qt = path_qt.dst_ep
|
|
src_tr, dst_tr = path_tr
|
|
self.assertEqual(
|
|
str(src_qt.addr), self.make_addr(src_tr.sw_if_index, addr_id, is_v6)
|
|
)
|
|
self.assertEqual(
|
|
str(dst_qt.addr), self.make_addr(dst_tr.sw_if_index, addr_id, is_v6)
|
|
)
|
|
|
|
def add_del_address(self, pg, addr_id, is_add=True, is_v6=False):
|
|
self.vapi.sw_interface_add_del_address(
|
|
sw_if_index=pg.sw_if_index,
|
|
prefix=self.make_prefix(pg.sw_if_index, addr_id, is_v6),
|
|
is_add=1 if is_add else 0,
|
|
)
|
|
|
|
def _test_dhcp_v46(self, is_v6):
|
|
self.create_pg_interfaces(range(4))
|
|
for i in self.pg_interfaces:
|
|
i.admin_up()
|
|
paths = [
|
|
(Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg2, is_v6=is_v6)),
|
|
(Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg3, is_v6=is_v6)),
|
|
]
|
|
ep = Endpoint(pg=self.pg0, is_v6=is_v6)
|
|
t = Translation(self, TCP, ep, paths, 0x9F).add_vpp_config()
|
|
# Add an address on every interface
|
|
# and check it is reflected in the cnat config
|
|
for pg in self.pg_interfaces:
|
|
self.add_del_address(pg, addr_id=0, is_add=True, is_v6=is_v6)
|
|
self.check_resolved(t, addr_id=0, is_v6=is_v6)
|
|
# Add a new address on every interface, remove the old one
|
|
# and check it is reflected in the cnat config
|
|
for pg in self.pg_interfaces:
|
|
self.add_del_address(pg, addr_id=1, is_add=True, is_v6=is_v6)
|
|
self.add_del_address(pg, addr_id=0, is_add=False, is_v6=is_v6)
|
|
self.check_resolved(t, addr_id=1, is_v6=is_v6)
|
|
# remove the configuration
|
|
for pg in self.pg_interfaces:
|
|
self.add_del_address(pg, addr_id=1, is_add=False, is_v6=is_v6)
|
|
t.remove_vpp_config()
|
|
|
|
def test_dhcp_v4(self):
|
|
self._test_dhcp_v46(False)
|
|
|
|
def test_dhcp_v6(self):
|
|
self._test_dhcp_v46(True)
|
|
|
|
def test_dhcp_snat(self):
|
|
self.create_pg_interfaces(range(1))
|
|
for i in self.pg_interfaces:
|
|
i.admin_up()
|
|
self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
|
|
# Add an address on every interface
|
|
# and check it is reflected in the cnat config
|
|
for pg in self.pg_interfaces:
|
|
self.add_del_address(pg, addr_id=0, is_add=True, is_v6=False)
|
|
self.add_del_address(pg, addr_id=0, is_add=True, is_v6=True)
|
|
r = self.vapi.cnat_get_snat_addresses()
|
|
self.assertEqual(
|
|
str(r.snat_ip4),
|
|
self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=False),
|
|
)
|
|
self.assertEqual(
|
|
str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=True)
|
|
)
|
|
# Add a new address on every interface, remove the old one
|
|
# and check it is reflected in the cnat config
|
|
for pg in self.pg_interfaces:
|
|
self.add_del_address(pg, addr_id=1, is_add=True, is_v6=False)
|
|
self.add_del_address(pg, addr_id=1, is_add=True, is_v6=True)
|
|
self.add_del_address(pg, addr_id=0, is_add=False, is_v6=False)
|
|
self.add_del_address(pg, addr_id=0, is_add=False, is_v6=True)
|
|
r = self.vapi.cnat_get_snat_addresses()
|
|
self.assertEqual(
|
|
str(r.snat_ip4),
|
|
self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=False),
|
|
)
|
|
self.assertEqual(
|
|
str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=True)
|
|
)
|
|
# remove the configuration
|
|
for pg in self.pg_interfaces:
|
|
self.add_del_address(pg, addr_id=1, is_add=False, is_v6=False)
|
|
self.add_del_address(pg, addr_id=1, is_add=False, is_v6=True)
|
|
self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main(testRunner=VppTestRunner)
|