d9b0c6fbf7
Drop pycodestyle for code style checking in favor of black. Black is much faster, stable PEP8 compliant code style checker offering also automatic formatting. It aims to be very stable and produce smallest diffs. It's used by many small and big projects. Running checkstyle with black takes a few seconds with a terse output. Thus, test-checkstyle-diff is no longer necessary. Expand scope of checkstyle to all python files in the repo, replacing test-checkstyle with checkstyle-python. Also, fixstyle-python is now available for automatic style formatting. Note: python virtualenv has been consolidated in test/Makefile, test/requirements*.txt which will eventually be moved to a central location. This is required to simply the automated generation of docker executor images in the CI. Type: improvement Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8 Signed-off-by: Klement Sekera <klement.sekera@gmail.com> Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
359 lines
11 KiB
Python
359 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
|
|
from framework import VppTestCase
|
|
from ipaddress import IPv4Address
|
|
from ipaddress import IPv6Address
|
|
from scapy.contrib.gtp import *
|
|
from scapy.all import *
|
|
|
|
|
|
class TestSRv6EndMGTP4E(VppTestCase):
|
|
"""SRv6 End.M.GTP4.E (SRv6 -> GTP-U)"""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(TestSRv6EndMGTP4E, cls).setUpClass()
|
|
try:
|
|
cls.create_pg_interfaces(range(2))
|
|
cls.pg_if_i = cls.pg_interfaces[0]
|
|
cls.pg_if_o = cls.pg_interfaces[1]
|
|
|
|
cls.pg_if_i.config_ip6()
|
|
cls.pg_if_o.config_ip4()
|
|
|
|
cls.ip4_dst = cls.pg_if_o.remote_ip4
|
|
# cls.ip4_src = cls.pg_if_o.local_ip4
|
|
cls.ip4_src = "192.168.192.10"
|
|
|
|
for pg_if in cls.pg_interfaces:
|
|
pg_if.admin_up()
|
|
pg_if.resolve_arp()
|
|
|
|
except Exception:
|
|
super(TestSRv6EndMGTP4E, cls).tearDownClass()
|
|
raise
|
|
|
|
def create_packets(self, inner):
|
|
|
|
ip4_dst = IPv4Address(str(self.ip4_dst))
|
|
# 32bit prefix + 32bit IPv4 DA + 8bit + 32bit TEID + 24bit
|
|
dst = b"\xaa" * 4 + ip4_dst.packed + b"\x11" + b"\xbb" * 4 + b"\x11" * 3
|
|
ip6_dst = IPv6Address(dst)
|
|
|
|
ip4_src = IPv4Address(str(self.ip4_src))
|
|
# 64bit prefix + 32bit IPv4 SA + 16 bit port + 16bit
|
|
src = b"\xcc" * 8 + ip4_src.packed + b"\xdd" * 2 + b"\x11" * 2
|
|
ip6_src = IPv6Address(src)
|
|
|
|
self.logger.info("ip4 dst: {}".format(ip4_dst))
|
|
self.logger.info("ip4 src: {}".format(ip4_src))
|
|
self.logger.info("ip6 dst (remote srgw): {}".format(ip6_dst))
|
|
self.logger.info("ip6 src (local srgw): {}".format(ip6_src))
|
|
|
|
pkts = list()
|
|
for d, s in inner:
|
|
pkt = (
|
|
Ether()
|
|
/ IPv6(dst=str(ip6_dst), src=str(ip6_src))
|
|
/ IPv6ExtHdrSegmentRouting()
|
|
/ IPv6(dst=d, src=s)
|
|
/ UDP(sport=1000, dport=23)
|
|
)
|
|
self.logger.info(pkt.show2(dump=True))
|
|
pkts.append(pkt)
|
|
|
|
return pkts
|
|
|
|
def test_srv6_mobile(self):
|
|
"""test_srv6_mobile"""
|
|
pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
|
|
|
|
self.vapi.cli(
|
|
"sr localsid address {} behavior end.m.gtp4.e ".format(pkts[0]["IPv6"].dst)
|
|
+ "v4src_position 64 fib-table 0"
|
|
)
|
|
self.logger.info(self.vapi.cli("show sr localsid"))
|
|
|
|
self.vapi.cli("clear errors")
|
|
|
|
self.pg0.add_stream(pkts)
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
|
|
self.logger.info(self.vapi.cli("show errors"))
|
|
self.logger.info(self.vapi.cli("show int address"))
|
|
|
|
capture = self.pg1.get_capture(len(pkts))
|
|
|
|
for pkt in capture:
|
|
self.logger.info(pkt.show2(dump=True))
|
|
self.assertEqual(pkt[IP].dst, self.ip4_dst)
|
|
self.assertEqual(pkt[IP].src, self.ip4_src)
|
|
self.assertEqual(pkt[GTP_U_Header].teid, 0xBBBBBBBB)
|
|
|
|
|
|
class TestSRv6TMGTP4D(VppTestCase):
|
|
"""SRv6 T.M.GTP4.D (GTP-U -> SRv6)"""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(TestSRv6TMGTP4D, cls).setUpClass()
|
|
try:
|
|
cls.create_pg_interfaces(range(2))
|
|
cls.pg_if_i = cls.pg_interfaces[0]
|
|
cls.pg_if_o = cls.pg_interfaces[1]
|
|
|
|
cls.pg_if_i.config_ip4()
|
|
cls.pg_if_i.config_ip6()
|
|
cls.pg_if_o.config_ip4()
|
|
cls.pg_if_o.config_ip6()
|
|
|
|
cls.ip4_dst = "1.1.1.1"
|
|
cls.ip4_src = "2.2.2.2"
|
|
|
|
cls.ip6_dst = cls.pg_if_o.remote_ip6
|
|
|
|
for pg_if in cls.pg_interfaces:
|
|
pg_if.admin_up()
|
|
pg_if.resolve_arp()
|
|
pg_if.resolve_ndp(timeout=5)
|
|
|
|
except Exception:
|
|
super(TestSRv6TMGTP4D, cls).tearDownClass()
|
|
raise
|
|
|
|
def create_packets(self, inner):
|
|
|
|
ip4_dst = IPv4Address(str(self.ip4_dst))
|
|
|
|
ip4_src = IPv4Address(str(self.ip4_src))
|
|
|
|
self.logger.info("ip4 dst: {}".format(ip4_dst))
|
|
self.logger.info("ip4 src: {}".format(ip4_src))
|
|
|
|
pkts = list()
|
|
for d, s in inner:
|
|
pkt = (
|
|
Ether()
|
|
/ IP(dst=str(ip4_dst), src=str(ip4_src))
|
|
/ UDP(sport=2152, dport=2152)
|
|
/ GTP_U_Header(gtp_type="g_pdu", teid=200)
|
|
/ IPv6(dst=d, src=s)
|
|
/ UDP(sport=1000, dport=23)
|
|
)
|
|
self.logger.info(pkt.show2(dump=True))
|
|
pkts.append(pkt)
|
|
|
|
return pkts
|
|
|
|
def test_srv6_mobile(self):
|
|
"""test_srv6_mobile"""
|
|
pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
|
|
|
|
self.vapi.cli("set sr encaps source addr A1::1")
|
|
self.vapi.cli("sr policy add bsid D4:: next D2:: next D3::")
|
|
self.vapi.cli(
|
|
"sr policy add bsid D5:: behavior t.m.gtp4.d D4::/32 "
|
|
+ "v6src_prefix C1::/64 nhtype ipv6 fib-table 0 drop-in"
|
|
)
|
|
self.vapi.cli("sr steer l3 {}/32 via bsid D5::".format(self.ip4_dst))
|
|
self.vapi.cli("ip route add D2::/32 via {}".format(self.ip6_dst))
|
|
|
|
self.logger.info(self.vapi.cli("show sr steer"))
|
|
self.logger.info(self.vapi.cli("show sr policies"))
|
|
|
|
self.vapi.cli("clear errors")
|
|
|
|
self.pg0.add_stream(pkts)
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
|
|
self.logger.info(self.vapi.cli("show errors"))
|
|
self.logger.info(self.vapi.cli("show int address"))
|
|
|
|
capture = self.pg1.get_capture(len(pkts))
|
|
|
|
for pkt in capture:
|
|
self.logger.info(pkt.show2(dump=True))
|
|
self.logger.info(
|
|
"GTP4.D Address={}".format(
|
|
str(pkt[IPv6ExtHdrSegmentRouting].addresses[0])
|
|
)
|
|
)
|
|
self.assertEqual(
|
|
str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]), "d4:0:101:101::c800:0"
|
|
)
|
|
|
|
|
|
class TestSRv6EndMGTP6E(VppTestCase):
|
|
"""SRv6 End.M.GTP6.E"""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(TestSRv6EndMGTP6E, cls).setUpClass()
|
|
try:
|
|
cls.create_pg_interfaces(range(2))
|
|
cls.pg_if_i = cls.pg_interfaces[0]
|
|
cls.pg_if_o = cls.pg_interfaces[1]
|
|
|
|
cls.pg_if_i.config_ip6()
|
|
cls.pg_if_o.config_ip6()
|
|
|
|
cls.ip6_nhop = cls.pg_if_o.remote_ip6
|
|
|
|
for pg_if in cls.pg_interfaces:
|
|
pg_if.admin_up()
|
|
pg_if.resolve_ndp(timeout=5)
|
|
|
|
except Exception:
|
|
super(TestSRv6EndMGTP6E, cls).tearDownClass()
|
|
raise
|
|
|
|
def create_packets(self, inner):
|
|
# 64bit prefix + 8bit QFI + 32bit TEID + 24bit
|
|
dst = b"\xaa" * 8 + b"\x00" + b"\xbb" * 4 + b"\x00" * 3
|
|
ip6_dst = IPv6Address(dst)
|
|
|
|
self.ip6_dst = ip6_dst
|
|
|
|
src = b"\xcc" * 8 + b"\xdd" * 4 + b"\x11" * 4
|
|
ip6_src = IPv6Address(src)
|
|
|
|
self.ip6_src = ip6_src
|
|
|
|
pkts = list()
|
|
for d, s in inner:
|
|
pkt = (
|
|
Ether()
|
|
/ IPv6(dst=str(ip6_dst), src=str(ip6_src))
|
|
/ IPv6ExtHdrSegmentRouting(
|
|
segleft=1, lastentry=0, tag=0, addresses=["a1::1"]
|
|
)
|
|
/ IPv6(dst=d, src=s)
|
|
/ UDP(sport=1000, dport=23)
|
|
)
|
|
self.logger.info(pkt.show2(dump=True))
|
|
pkts.append(pkt)
|
|
|
|
return pkts
|
|
|
|
def test_srv6_mobile(self):
|
|
"""test_srv6_mobile"""
|
|
pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
|
|
|
|
self.vapi.cli(
|
|
"sr localsid prefix {}/64 behavior end.m.gtp6.e fib-table 0".format(
|
|
pkts[0]["IPv6"].dst
|
|
)
|
|
)
|
|
self.vapi.cli("ip route add a1::/64 via {}".format(self.ip6_nhop))
|
|
self.logger.info(self.vapi.cli("show sr localsid"))
|
|
|
|
self.vapi.cli("clear errors")
|
|
|
|
self.pg0.add_stream(pkts)
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
|
|
self.logger.info(self.vapi.cli("show errors"))
|
|
self.logger.info(self.vapi.cli("show int address"))
|
|
|
|
capture = self.pg1.get_capture(len(pkts))
|
|
|
|
for pkt in capture:
|
|
self.logger.info(pkt.show2(dump=True))
|
|
self.assertEqual(pkt[IPv6].dst, "a1::1")
|
|
self.assertEqual(pkt[IPv6].src, str(self.ip6_src))
|
|
self.assertEqual(pkt[GTP_U_Header].teid, 0xBBBBBBBB)
|
|
|
|
|
|
class TestSRv6EndMGTP6D(VppTestCase):
|
|
"""SRv6 End.M.GTP6.D"""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(TestSRv6EndMGTP6D, cls).setUpClass()
|
|
try:
|
|
cls.create_pg_interfaces(range(2))
|
|
cls.pg_if_i = cls.pg_interfaces[0]
|
|
cls.pg_if_o = cls.pg_interfaces[1]
|
|
|
|
cls.pg_if_i.config_ip6()
|
|
cls.pg_if_o.config_ip6()
|
|
|
|
cls.ip6_nhop = cls.pg_if_o.remote_ip6
|
|
|
|
cls.ip6_dst = "2001::1"
|
|
cls.ip6_src = "2002::1"
|
|
|
|
for pg_if in cls.pg_interfaces:
|
|
pg_if.admin_up()
|
|
pg_if.resolve_ndp(timeout=5)
|
|
|
|
except Exception:
|
|
super(TestSRv6EndMGTP6D, cls).tearDownClass()
|
|
raise
|
|
|
|
def create_packets(self, inner):
|
|
|
|
ip6_dst = IPv6Address(str(self.ip6_dst))
|
|
|
|
ip6_src = IPv6Address(str(self.ip6_src))
|
|
|
|
self.logger.info("ip6 dst: {}".format(ip6_dst))
|
|
self.logger.info("ip6 src: {}".format(ip6_src))
|
|
|
|
pkts = list()
|
|
for d, s in inner:
|
|
pkt = (
|
|
Ether()
|
|
/ IPv6(dst=str(ip6_dst), src=str(ip6_src))
|
|
/ UDP(sport=2152, dport=2152)
|
|
/ GTP_U_Header(gtp_type="g_pdu", teid=200)
|
|
/ IPv6(dst=d, src=s)
|
|
/ UDP(sport=1000, dport=23)
|
|
)
|
|
self.logger.info(pkt.show2(dump=True))
|
|
pkts.append(pkt)
|
|
|
|
return pkts
|
|
|
|
def test_srv6_mobile(self):
|
|
"""test_srv6_mobile"""
|
|
pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
|
|
|
|
self.vapi.cli("set sr encaps source addr A1::1")
|
|
self.vapi.cli("sr policy add bsid D4:: next D2:: next D3::")
|
|
self.vapi.cli(
|
|
"sr localsid prefix 2001::/64 behavior end.m.gtp6.d "
|
|
+ "D4::/64 fib-table 0 drop-in"
|
|
)
|
|
self.vapi.cli("ip route add D2::/64 via {}".format(self.ip6_nhop))
|
|
|
|
self.logger.info(self.vapi.cli("show sr policies"))
|
|
self.logger.info(self.vapi.cli("show sr localsid"))
|
|
|
|
self.vapi.cli("clear errors")
|
|
|
|
self.pg0.add_stream(pkts)
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
|
|
self.logger.info(self.vapi.cli("show errors"))
|
|
self.logger.info(self.vapi.cli("show int address"))
|
|
|
|
capture = self.pg1.get_capture(len(pkts))
|
|
|
|
for pkt in capture:
|
|
self.logger.info(pkt.show2(dump=True))
|
|
self.logger.info(
|
|
"GTP6.D SID0={}".format(str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]))
|
|
)
|
|
self.logger.info(
|
|
"GTP6.D SID1={}".format(str(pkt[IPv6ExtHdrSegmentRouting].addresses[1]))
|
|
)
|
|
self.assertEqual(str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]), "2001::1")
|
|
self.assertEqual(
|
|
str(pkt[IPv6ExtHdrSegmentRouting].addresses[1]), "d4::c800:0"
|
|
)
|