2020-05-19 07:17:19 +00:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
|
|
|
import unittest
|
|
|
|
|
|
|
|
from framework import VppTestCase, VppTestRunner
|
|
|
|
from vpp_ip import DpoProto
|
|
|
|
|
|
|
|
from scapy.packet import Raw
|
|
|
|
from scapy.layers.l2 import Ether
|
|
|
|
from scapy.layers.inet import IP, UDP, TCP
|
|
|
|
from scapy.layers.inet6 import IPv6
|
|
|
|
|
|
|
|
from ipaddress import ip_address, ip_network, \
|
|
|
|
IPv4Address, IPv6Address, IPv4Network, IPv6Network
|
|
|
|
|
|
|
|
from vpp_object import VppObject
|
|
|
|
from vpp_papi import VppEnum
|
|
|
|
|
|
|
|
N_PKTS = 15
|
|
|
|
|
|
|
|
|
|
|
|
def find_cnat_translation(test, id):
|
|
|
|
ts = test.vapi.cnat_translation_dump()
|
|
|
|
for t in ts:
|
|
|
|
if id == t.translation.id:
|
|
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
class Ep(object):
|
|
|
|
""" CNat endpoint """
|
|
|
|
|
|
|
|
def __init__(self, ip, port, l4p=TCP):
|
|
|
|
self.ip = ip
|
|
|
|
self.port = port
|
|
|
|
self.l4p = l4p
|
|
|
|
|
|
|
|
def encode(self):
|
|
|
|
return {'addr': self.ip,
|
|
|
|
'port': self.port}
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
return ("%s:%d" % (self.ip, self.port))
|
|
|
|
|
|
|
|
|
|
|
|
class EpTuple(object):
|
|
|
|
""" CNat endpoint """
|
|
|
|
|
|
|
|
def __init__(self, src, dst):
|
|
|
|
self.src = src
|
|
|
|
self.dst = dst
|
|
|
|
|
|
|
|
def encode(self):
|
|
|
|
return {'src_ep': self.src.encode(),
|
|
|
|
'dst_ep': self.dst.encode()}
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
return ("%s->%s" % (self.src, self.dst))
|
|
|
|
|
|
|
|
|
|
|
|
class VppCNatTranslation(VppObject):
|
|
|
|
|
|
|
|
def __init__(self, test, iproto, vip, paths):
|
|
|
|
self._test = test
|
|
|
|
self.vip = vip
|
|
|
|
self.iproto = iproto
|
|
|
|
self.paths = paths
|
|
|
|
self.encoded_paths = []
|
|
|
|
for path in self.paths:
|
|
|
|
self.encoded_paths.append(path.encode())
|
|
|
|
|
|
|
|
@property
|
|
|
|
def vl4_proto(self):
|
|
|
|
ip_proto = VppEnum.vl_api_ip_proto_t
|
|
|
|
return {
|
|
|
|
UDP: ip_proto.IP_API_PROTO_UDP,
|
|
|
|
TCP: ip_proto.IP_API_PROTO_TCP,
|
|
|
|
}[self.iproto]
|
|
|
|
|
|
|
|
def delete(self):
|
|
|
|
r = self._test.vapi.cnat_translation_del(id=self.id)
|
|
|
|
|
|
|
|
def add_vpp_config(self):
|
|
|
|
r = self._test.vapi.cnat_translation_update(
|
|
|
|
{'vip': self.vip.encode(),
|
|
|
|
'ip_proto': self.vl4_proto,
|
|
|
|
'n_paths': len(self.paths),
|
|
|
|
'paths': self.encoded_paths})
|
|
|
|
self._test.registry.register(self, self._test.logger)
|
|
|
|
self.id = r.id
|
|
|
|
|
|
|
|
def modify_vpp_config(self, paths):
|
|
|
|
self.paths = paths
|
|
|
|
self.encoded_paths = []
|
|
|
|
for path in self.paths:
|
|
|
|
self.encoded_paths.append(path.encode())
|
|
|
|
|
|
|
|
r = self._test.vapi.cnat_translation_update(
|
|
|
|
{'vip': self.vip.encode(),
|
|
|
|
'ip_proto': self.vl4_proto,
|
|
|
|
'n_paths': len(self.paths),
|
|
|
|
'paths': self.encoded_paths})
|
|
|
|
self._test.registry.register(self, self._test.logger)
|
|
|
|
|
|
|
|
def remove_vpp_config(self):
|
|
|
|
self._test.vapi.cnat_translation_del(self.id)
|
|
|
|
|
|
|
|
def query_vpp_config(self):
|
|
|
|
return find_cnat_translation(self._test, self.id)
|
|
|
|
|
|
|
|
def object_id(self):
|
|
|
|
return ("cnat-translation-%s" % (self.vip))
|
|
|
|
|
|
|
|
def get_stats(self):
|
|
|
|
c = self._test.statistics.get_counter("/net/cnat-translation")
|
|
|
|
return c[0][self.id]
|
|
|
|
|
|
|
|
|
|
|
|
class VppCNATSourceNat(VppObject):
|
|
|
|
|
|
|
|
def __init__(self, test, address, exclude_subnets=[]):
|
|
|
|
self._test = test
|
|
|
|
self.address = address
|
|
|
|
self.exclude_subnets = exclude_subnets
|
|
|
|
|
|
|
|
def add_vpp_config(self):
|
|
|
|
a = ip_address(self.address)
|
|
|
|
if 4 == a.version:
|
|
|
|
self._test.vapi.cnat_set_snat_addresses(snat_ip4=self.address)
|
|
|
|
else:
|
|
|
|
self._test.vapi.cnat_set_snat_addresses(snat_ip6=self.address)
|
|
|
|
for subnet in self.exclude_subnets:
|
|
|
|
self.cnat_exclude_subnet(subnet, True)
|
|
|
|
|
|
|
|
def cnat_exclude_subnet(self, exclude_subnet, isAdd=True):
|
|
|
|
add = 1 if isAdd else 0
|
|
|
|
self._test.vapi.cnat_add_del_snat_prefix(
|
|
|
|
prefix=exclude_subnet, is_add=add)
|
|
|
|
|
|
|
|
def query_vpp_config(self):
|
|
|
|
return False
|
|
|
|
|
|
|
|
def remove_vpp_config(self):
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
class TestCNatTranslation(VppTestCase):
|
|
|
|
""" CNat Translation """
|
|
|
|
extra_vpp_punt_config = ["cnat", "{",
|
|
|
|
"session-max-age", "1",
|
|
|
|
"tcp-max-age", "1", "}"]
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def setUpClass(cls):
|
|
|
|
super(TestCNatTranslation, cls).setUpClass()
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def tearDownClass(cls):
|
|
|
|
super(TestCNatTranslation, cls).tearDownClass()
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
super(TestCNatTranslation, self).setUp()
|
|
|
|
|
|
|
|
self.create_pg_interfaces(range(3))
|
|
|
|
|
|
|
|
for i in self.pg_interfaces:
|
|
|
|
i.admin_up()
|
|
|
|
i.config_ip4()
|
|
|
|
i.resolve_arp()
|
|
|
|
i.config_ip6()
|
|
|
|
i.resolve_ndp()
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
|
|
for i in self.pg_interfaces:
|
|
|
|
i.unconfig_ip4()
|
|
|
|
i.unconfig_ip6()
|
|
|
|
i.admin_down()
|
|
|
|
super(TestCNatTranslation, self).tearDown()
|
|
|
|
|
|
|
|
def cnat_create_translation(self, vip, nbr, isV6=False):
|
|
|
|
ip_v = "ip6" if isV6 else "ip4"
|
|
|
|
dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr)
|
|
|
|
sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0)
|
|
|
|
t1 = VppCNatTranslation(
|
|
|
|
self, vip.l4p, vip,
|
|
|
|
[EpTuple(sep, dep), EpTuple(sep, dep)])
|
|
|
|
t1.add_vpp_config()
|
|
|
|
return t1
|
|
|
|
|
|
|
|
def cnat_test_translation(self, t1, nbr, sports, isV6=False):
|
|
|
|
ip_v = "ip6" if isV6 else "ip4"
|
|
|
|
ip_class = IPv6 if isV6 else IP
|
|
|
|
vip = t1.vip
|
|
|
|
|
|
|
|
#
|
|
|
|
# Flows
|
|
|
|
#
|
|
|
|
for src in self.pg0.remote_hosts:
|
|
|
|
for sport in sports:
|
|
|
|
# from client to vip
|
|
|
|
p1 = (Ether(dst=self.pg0.local_mac,
|
|
|
|
src=src.mac) /
|
|
|
|
ip_class(src=getattr(src, ip_v), dst=vip.ip) /
|
|
|
|
vip.l4p(sport=sport, dport=vip.port) /
|
|
|
|
Raw())
|
|
|
|
|
|
|
|
self.vapi.cli("trace add pg-input 1")
|
|
|
|
rxs = self.send_and_expect(self.pg0,
|
|
|
|
p1 * N_PKTS,
|
|
|
|
self.pg1)
|
|
|
|
|
|
|
|
for rx in rxs:
|
|
|
|
self.assert_packet_checksums_valid(rx)
|
|
|
|
self.assertEqual(
|
|
|
|
rx[ip_class].dst,
|
|
|
|
getattr(self.pg1.remote_hosts[nbr], ip_v))
|
|
|
|
self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
|
|
|
|
self.assertEqual(
|
|
|
|
rx[ip_class].src,
|
|
|
|
getattr(src, ip_v))
|
|
|
|
self.assertEqual(rx[vip.l4p].sport, sport)
|
|
|
|
|
|
|
|
# from vip to client
|
|
|
|
p1 = (Ether(dst=self.pg1.local_mac,
|
|
|
|
src=self.pg1.remote_mac) /
|
|
|
|
ip_class(src=getattr(
|
|
|
|
self.pg1.remote_hosts[nbr],
|
|
|
|
ip_v),
|
|
|
|
dst=getattr(src, ip_v)) /
|
|
|
|
vip.l4p(sport=4000 + nbr, dport=sport) /
|
|
|
|
Raw())
|
|
|
|
|
|
|
|
rxs = self.send_and_expect(self.pg1,
|
|
|
|
p1 * N_PKTS,
|
|
|
|
self.pg0)
|
|
|
|
|
|
|
|
for rx in rxs:
|
|
|
|
self.assert_packet_checksums_valid(rx)
|
|
|
|
self.assertEqual(
|
|
|
|
rx[ip_class].dst,
|
|
|
|
getattr(src, ip_v))
|
|
|
|
self.assertEqual(rx[vip.l4p].dport, sport)
|
|
|
|
self.assertEqual(rx[ip_class].src, vip.ip)
|
|
|
|
self.assertEqual(rx[vip.l4p].sport, vip.port)
|
|
|
|
|
|
|
|
#
|
|
|
|
# packets to the VIP that do not match a
|
|
|
|
# translation are dropped
|
|
|
|
#
|
|
|
|
p1 = (Ether(dst=self.pg0.local_mac,
|
|
|
|
src=src.mac) /
|
|
|
|
ip_class(src=getattr(src, ip_v), dst=vip.ip) /
|
|
|
|
vip.l4p(sport=sport, dport=6666) /
|
|
|
|
Raw())
|
|
|
|
|
|
|
|
self.send_and_assert_no_replies(self.pg0,
|
|
|
|
p1 * N_PKTS,
|
|
|
|
self.pg1)
|
|
|
|
|
|
|
|
#
|
|
|
|
# packets from the VIP that do not match a
|
|
|
|
# session are forwarded
|
|
|
|
#
|
|
|
|
p1 = (Ether(dst=self.pg1.local_mac,
|
|
|
|
src=self.pg1.remote_mac) /
|
|
|
|
ip_class(src=getattr(
|
|
|
|
self.pg1.remote_hosts[nbr],
|
|
|
|
ip_v),
|
|
|
|
dst=getattr(src, ip_v)) /
|
|
|
|
vip.l4p(sport=6666, dport=sport) /
|
|
|
|
Raw())
|
|
|
|
|
|
|
|
rxs = self.send_and_expect(self.pg1,
|
|
|
|
p1 * N_PKTS,
|
|
|
|
self.pg0)
|
|
|
|
|
|
|
|
self.assertEqual(t1.get_stats()['packets'],
|
|
|
|
N_PKTS *
|
|
|
|
len(sports) *
|
|
|
|
len(self.pg0.remote_hosts))
|
|
|
|
|
|
|
|
def cnat_test_translation_update(self, t1, sports, isV6=False):
|
|
|
|
ip_v = "ip6" if isV6 else "ip4"
|
|
|
|
ip_class = IPv6 if isV6 else IP
|
|
|
|
vip = t1.vip
|
|
|
|
|
|
|
|
#
|
|
|
|
# modify the translation to use a different backend
|
|
|
|
#
|
|
|
|
dep = Ep(getattr(self.pg2, 'remote_' + ip_v), 5000)
|
|
|
|
sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0)
|
|
|
|
t1.modify_vpp_config([EpTuple(sep, dep)])
|
|
|
|
|
|
|
|
#
|
|
|
|
# existing flows follow the old path
|
|
|
|
#
|
|
|
|
for src in self.pg0.remote_hosts:
|
|
|
|
for sport in sports:
|
|
|
|
# from client to vip
|
|
|
|
p1 = (Ether(dst=self.pg0.local_mac,
|
|
|
|
src=src.mac) /
|
|
|
|
ip_class(src=getattr(src, ip_v), dst=vip.ip) /
|
|
|
|
vip.l4p(sport=sport, dport=vip.port) /
|
|
|
|
Raw())
|
|
|
|
|
|
|
|
rxs = self.send_and_expect(self.pg0,
|
|
|
|
p1 * N_PKTS,
|
|
|
|
self.pg1)
|
|
|
|
|
|
|
|
#
|
|
|
|
# new flows go to the new backend
|
|
|
|
#
|
|
|
|
for src in self.pg0.remote_hosts:
|
|
|
|
p1 = (Ether(dst=self.pg0.local_mac,
|
|
|
|
src=src.mac) /
|
|
|
|
ip_class(src=getattr(src, ip_v), dst=vip.ip) /
|
|
|
|
vip.l4p(sport=9999, dport=vip.port) /
|
|
|
|
Raw())
|
|
|
|
|
|
|
|
rxs = self.send_and_expect(self.pg0,
|
|
|
|
p1 * N_PKTS,
|
|
|
|
self.pg2)
|
|
|
|
|
|
|
|
def cnat_translation(self, vips, isV6=False):
|
|
|
|
""" CNat Translation """
|
|
|
|
|
|
|
|
ip_class = IPv6 if isV6 else IP
|
|
|
|
ip_v = "ip6" if isV6 else "ip4"
|
|
|
|
sports = [1234, 1233]
|
|
|
|
|
|
|
|
#
|
|
|
|
# turn the scanner off whilst testing otherwise sessions
|
|
|
|
# will time out
|
|
|
|
#
|
|
|
|
self.vapi.cli("test cnat scanner off")
|
|
|
|
|
|
|
|
sessions = self.vapi.cnat_session_dump()
|
|
|
|
|
|
|
|
trs = []
|
|
|
|
for nbr, vip in enumerate(vips):
|
|
|
|
trs.append(self.cnat_create_translation(vip, nbr, isV6=isV6))
|
|
|
|
|
|
|
|
self.logger.info(self.vapi.cli("sh cnat client"))
|
|
|
|
self.logger.info(self.vapi.cli("sh cnat translation"))
|
|
|
|
|
|
|
|
#
|
|
|
|
# translations
|
|
|
|
#
|
|
|
|
for nbr, vip in enumerate(vips):
|
|
|
|
self.cnat_test_translation(trs[nbr], nbr, sports, isV6=isV6)
|
|
|
|
self.cnat_test_translation_update(trs[nbr], sports, isV6=isV6)
|
|
|
|
if isV6:
|
|
|
|
self.logger.info(self.vapi.cli(
|
|
|
|
"sh ip6 fib %s" % self.pg0.remote_ip6))
|
|
|
|
else:
|
|
|
|
self.logger.info(self.vapi.cli(
|
|
|
|
"sh ip fib %s" % self.pg0.remote_ip4))
|
|
|
|
self.logger.info(self.vapi.cli("sh cnat session verbose"))
|
|
|
|
|
|
|
|
#
|
|
|
|
# turn the scanner back on and wait untill the sessions
|
|
|
|
# all disapper
|
|
|
|
#
|
|
|
|
self.vapi.cli("test cnat scanner on")
|
|
|
|
|
|
|
|
n_tries = 0
|
|
|
|
sessions = self.vapi.cnat_session_dump()
|
|
|
|
while (len(sessions) and n_tries < 100):
|
|
|
|
n_tries += 1
|
|
|
|
sessions = self.vapi.cnat_session_dump()
|
|
|
|
self.sleep(2)
|
|
|
|
|
|
|
|
self.assertTrue(n_tries < 100)
|
|
|
|
|
|
|
|
#
|
|
|
|
# load some flows again and purge
|
|
|
|
#
|
|
|
|
for vip in vips:
|
|
|
|
for src in self.pg0.remote_hosts:
|
|
|
|
for sport in sports:
|
|
|
|
# from client to vip
|
|
|
|
p1 = (Ether(dst=self.pg0.local_mac,
|
|
|
|
src=src.mac) /
|
|
|
|
ip_class(src=getattr(src, ip_v), dst=vip.ip) /
|
|
|
|
vip.l4p(sport=sport, dport=vip.port) /
|
|
|
|
Raw())
|
|
|
|
self.send_and_expect(self.pg0,
|
|
|
|
p1 * N_PKTS,
|
|
|
|
self.pg2)
|
|
|
|
|
|
|
|
for tr in trs:
|
|
|
|
tr.delete()
|
|
|
|
|
|
|
|
self.assertTrue(self.vapi.cnat_session_dump())
|
|
|
|
self.vapi.cnat_session_purge()
|
|
|
|
self.assertFalse(self.vapi.cnat_session_dump())
|
|
|
|
|
|
|
|
def test_cnat6(self):
|
|
|
|
# """ CNat Translation ipv6 """
|
|
|
|
vips = [
|
|
|
|
Ep("30::1", 5555),
|
|
|
|
Ep("30::2", 5554),
|
|
|
|
Ep("30::2", 5553, UDP),
|
|
|
|
]
|
|
|
|
|
|
|
|
self.pg0.generate_remote_hosts(len(vips))
|
|
|
|
self.pg0.configure_ipv6_neighbors()
|
|
|
|
self.pg1.generate_remote_hosts(len(vips))
|
|
|
|
self.pg1.configure_ipv6_neighbors()
|
|
|
|
|
|
|
|
self.cnat_translation(vips, isV6=True)
|
|
|
|
|
|
|
|
def test_cnat4(self):
|
|
|
|
# """ CNat Translation ipv4 """
|
|
|
|
|
|
|
|
vips = [
|
|
|
|
Ep("30.0.0.1", 5555),
|
|
|
|
Ep("30.0.0.2", 5554),
|
|
|
|
Ep("30.0.0.2", 5553, UDP),
|
|
|
|
]
|
|
|
|
|
|
|
|
self.pg0.generate_remote_hosts(len(vips))
|
|
|
|
self.pg0.configure_ipv4_neighbors()
|
|
|
|
self.pg1.generate_remote_hosts(len(vips))
|
|
|
|
self.pg1.configure_ipv4_neighbors()
|
|
|
|
|
|
|
|
self.cnat_translation(vips)
|
|
|
|
|
|
|
|
|
|
|
|
class TestCNatSourceNAT(VppTestCase):
|
|
|
|
""" CNat Source NAT """
|
|
|
|
extra_vpp_punt_config = ["cnat", "{",
|
|
|
|
"session-max-age", "1",
|
|
|
|
"tcp-max-age", "1", "}"]
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def setUpClass(cls):
|
|
|
|
super(TestCNatSourceNAT, cls).setUpClass()
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def tearDownClass(cls):
|
|
|
|
super(TestCNatSourceNAT, cls).tearDownClass()
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
super(TestCNatSourceNAT, self).setUp()
|
|
|
|
|
|
|
|
self.create_pg_interfaces(range(3))
|
|
|
|
|
|
|
|
for i in self.pg_interfaces:
|
|
|
|
i.admin_up()
|
|
|
|
i.config_ip4()
|
|
|
|
i.resolve_arp()
|
|
|
|
i.config_ip6()
|
|
|
|
i.resolve_ndp()
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
|
|
for i in self.pg_interfaces:
|
|
|
|
i.unconfig_ip4()
|
|
|
|
i.unconfig_ip6()
|
|
|
|
i.admin_down()
|
|
|
|
super(TestCNatSourceNAT, self).tearDown()
|
|
|
|
|
2020-09-01 14:30:02 +00:00
|
|
|
def cnat_set_snat_address(self, srcNatAddr, interface, isV6=False):
|
2020-05-19 07:17:19 +00:00
|
|
|
t1 = VppCNATSourceNat(self, srcNatAddr)
|
|
|
|
t1.add_vpp_config()
|
|
|
|
cnat_arc_name = "ip6-unicast" if isV6 else "ip4-unicast"
|
|
|
|
cnat_feature_name = "ip6-cnat-snat" if isV6 else "ip4-cnat-snat"
|
|
|
|
self.vapi.feature_enable_disable(
|
|
|
|
enable=1,
|
|
|
|
arc_name=cnat_arc_name,
|
|
|
|
feature_name=cnat_feature_name,
|
|
|
|
sw_if_index=interface.sw_if_index)
|
|
|
|
|
|
|
|
return t1
|
|
|
|
|
|
|
|
def cnat_test_sourcenat(self, srcNatAddr, l4p=TCP, isV6=False):
|
|
|
|
ip_v = "ip6" if isV6 else "ip4"
|
|
|
|
ip_class = IPv6 if isV6 else IP
|
|
|
|
sports = [1234, 1235, 1236]
|
|
|
|
dports = [6661, 6662, 6663]
|
|
|
|
|
|
|
|
self.pg0.generate_remote_hosts(1)
|
|
|
|
self.pg0.configure_ipv4_neighbors()
|
|
|
|
self.pg0.configure_ipv6_neighbors()
|
|
|
|
self.pg1.generate_remote_hosts(len(sports))
|
|
|
|
self.pg1.configure_ipv4_neighbors()
|
|
|
|
self.pg1.configure_ipv6_neighbors()
|
|
|
|
|
|
|
|
self.vapi.cli("test cnat scanner on")
|
2020-09-01 14:30:02 +00:00
|
|
|
t1 = self.cnat_set_snat_address(srcNatAddr, self.pg0, isV6)
|
2020-05-19 07:17:19 +00:00
|
|
|
|
|
|
|
for nbr, remote_host in enumerate(self.pg1.remote_hosts):
|
|
|
|
# from pods to outside network
|
|
|
|
p1 = (
|
|
|
|
Ether(
|
|
|
|
dst=self.pg0.local_mac,
|
|
|
|
src=self.pg0.remote_hosts[0].mac) /
|
|
|
|
ip_class(
|
|
|
|
src=getattr(self.pg0.remote_hosts[0], ip_v),
|
|
|
|
dst=getattr(remote_host, ip_v)) /
|
|
|
|
l4p(sport=sports[nbr], dport=dports[nbr]) /
|
|
|
|
Raw())
|
|
|
|
|
|
|
|
rxs = self.send_and_expect(
|
|
|
|
self.pg0,
|
|
|
|
p1 * N_PKTS,
|
|
|
|
self.pg1)
|
|
|
|
for rx in rxs:
|
|
|
|
self.assert_packet_checksums_valid(rx)
|
|
|
|
self.assertEqual(
|
|
|
|
rx[ip_class].dst,
|
|
|
|
getattr(remote_host, ip_v))
|
|
|
|
self.assertEqual(rx[l4p].dport, dports[nbr])
|
|
|
|
self.assertEqual(
|
|
|
|
rx[ip_class].src,
|
|
|
|
srcNatAddr)
|
|
|
|
sport = rx[l4p].sport
|
|
|
|
|
|
|
|
# from outside to pods
|
|
|
|
p2 = (
|
|
|
|
Ether(
|
|
|
|
dst=self.pg1.local_mac,
|
|
|
|
src=self.pg1.remote_hosts[nbr].mac) /
|
|
|
|
ip_class(src=getattr(remote_host, ip_v), dst=srcNatAddr) /
|
|
|
|
l4p(sport=dports[nbr], dport=sport) /
|
|
|
|
Raw())
|
|
|
|
|
|
|
|
rxs = self.send_and_expect(
|
|
|
|
self.pg1,
|
|
|
|
p2 * N_PKTS,
|
|
|
|
self.pg0)
|
|
|
|
|
|
|
|
for rx in rxs:
|
|
|
|
self.assert_packet_checksums_valid(rx)
|
|
|
|
self.assertEqual(
|
|
|
|
rx[ip_class].dst,
|
|
|
|
getattr(self.pg0.remote_hosts[0], ip_v))
|
|
|
|
self.assertEqual(rx[l4p].dport, sports[nbr])
|
|
|
|
self.assertEqual(rx[l4p].sport, dports[nbr])
|
|
|
|
self.assertEqual(
|
|
|
|
rx[ip_class].src,
|
|
|
|
getattr(remote_host, ip_v))
|
|
|
|
|
|
|
|
# add remote host to exclude list
|
|
|
|
subnet_mask = 100 if isV6 else 16
|
|
|
|
subnet = getattr(remote_host, ip_v) + "/" + str(subnet_mask)
|
|
|
|
exclude_subnet = ip_network(subnet, strict=False)
|
|
|
|
|
|
|
|
t1.cnat_exclude_subnet(exclude_subnet)
|
|
|
|
self.vapi.cnat_session_purge()
|
|
|
|
|
|
|
|
rxs = self.send_and_expect(
|
|
|
|
self.pg0,
|
|
|
|
p1 * N_PKTS,
|
|
|
|
self.pg1)
|
|
|
|
for rx in rxs:
|
|
|
|
self.assert_packet_checksums_valid(rx)
|
|
|
|
self.assertEqual(
|
|
|
|
rx[ip_class].dst,
|
|
|
|
getattr(remote_host, ip_v))
|
|
|
|
self.assertEqual(rx[l4p].dport, dports[nbr])
|
|
|
|
self.assertEqual(
|
|
|
|
rx[ip_class].src,
|
|
|
|
getattr(self.pg0.remote_hosts[0], ip_v))
|
|
|
|
|
|
|
|
# remove remote host from exclude list
|
|
|
|
t1.cnat_exclude_subnet(exclude_subnet, isAdd=False)
|
|
|
|
self.vapi.cnat_session_purge()
|
|
|
|
|
|
|
|
rxs = self.send_and_expect(
|
|
|
|
self.pg0,
|
|
|
|
p1 * N_PKTS,
|
|
|
|
self.pg1)
|
|
|
|
|
|
|
|
for rx in rxs:
|
|
|
|
self.assert_packet_checksums_valid(rx)
|
|
|
|
self.assertEqual(
|
|
|
|
rx[ip_class].dst,
|
|
|
|
getattr(remote_host, ip_v))
|
|
|
|
self.assertEqual(rx[l4p].dport, dports[nbr])
|
|
|
|
self.assertEqual(
|
|
|
|
rx[ip_class].src,
|
|
|
|
srcNatAddr)
|
|
|
|
|
2020-09-01 14:30:02 +00:00
|
|
|
def test_cnat6_sourcenat(self):
|
|
|
|
# """ CNat Source Nat ipv6 """
|
|
|
|
self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, TCP, True)
|
|
|
|
self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, UDP, True)
|
2020-05-19 07:17:19 +00:00
|
|
|
|
|
|
|
def test_cnat4_sourcenat(self):
|
|
|
|
# """ CNat Source Nat ipv4 """
|
|
|
|
self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, TCP)
|
|
|
|
self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, UDP)
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
unittest.main(testRunner=VppTestRunner)
|