7b8b465269
- Package update performed by 1. updating pip, pip-tools, setuptools 2. 'make test-refresh-deps' on ubuntu 22.04 3. fixing 'make test' and 'make docs' issues on ubuntu 22.04 4. 'make test-refresh-deps' on ubuntu 20.04 - Add dependency for 'make test-refresh-deps' to insure python venv is set up. - Update of python formatter, black, caused reformating of 41 python code files. Type: make Change-Id: I7cafdf4b5189065ac57cb6b254937f6e0897a924 Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
356 lines
11 KiB
Python
356 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
|
|
from framework import VppTestCase
|
|
from ipaddress import IPv4Address
|
|
from ipaddress import IPv6Address
|
|
from scapy.contrib.gtp import *
|
|
from scapy.all import *
|
|
|
|
|
|
class TestSRv6EndMGTP4E(VppTestCase):
|
|
"""SRv6 End.M.GTP4.E (SRv6 -> GTP-U)"""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(TestSRv6EndMGTP4E, cls).setUpClass()
|
|
try:
|
|
cls.create_pg_interfaces(range(2))
|
|
cls.pg_if_i = cls.pg_interfaces[0]
|
|
cls.pg_if_o = cls.pg_interfaces[1]
|
|
|
|
cls.pg_if_i.config_ip6()
|
|
cls.pg_if_o.config_ip4()
|
|
|
|
cls.ip4_dst = cls.pg_if_o.remote_ip4
|
|
# cls.ip4_src = cls.pg_if_o.local_ip4
|
|
cls.ip4_src = "192.168.192.10"
|
|
|
|
for pg_if in cls.pg_interfaces:
|
|
pg_if.admin_up()
|
|
pg_if.resolve_arp()
|
|
|
|
except Exception:
|
|
super(TestSRv6EndMGTP4E, cls).tearDownClass()
|
|
raise
|
|
|
|
def create_packets(self, inner):
|
|
ip4_dst = IPv4Address(str(self.ip4_dst))
|
|
# 32bit prefix + 32bit IPv4 DA + 8bit + 32bit TEID + 24bit
|
|
dst = b"\xaa" * 4 + ip4_dst.packed + b"\x11" + b"\xbb" * 4 + b"\x11" * 3
|
|
ip6_dst = IPv6Address(dst)
|
|
|
|
ip4_src = IPv4Address(str(self.ip4_src))
|
|
# 64bit prefix + 32bit IPv4 SA + 16 bit port + 16bit
|
|
src = b"\xcc" * 8 + ip4_src.packed + b"\xdd" * 2 + b"\x11" * 2
|
|
ip6_src = IPv6Address(src)
|
|
|
|
self.logger.info("ip4 dst: {}".format(ip4_dst))
|
|
self.logger.info("ip4 src: {}".format(ip4_src))
|
|
self.logger.info("ip6 dst (remote srgw): {}".format(ip6_dst))
|
|
self.logger.info("ip6 src (local srgw): {}".format(ip6_src))
|
|
|
|
pkts = list()
|
|
for d, s in inner:
|
|
pkt = (
|
|
Ether()
|
|
/ IPv6(dst=str(ip6_dst), src=str(ip6_src))
|
|
/ IPv6ExtHdrSegmentRouting()
|
|
/ IPv6(dst=d, src=s)
|
|
/ UDP(sport=1000, dport=23)
|
|
)
|
|
self.logger.info(pkt.show2(dump=True))
|
|
pkts.append(pkt)
|
|
|
|
return pkts
|
|
|
|
def test_srv6_mobile(self):
|
|
"""test_srv6_mobile"""
|
|
pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
|
|
|
|
self.vapi.cli(
|
|
"sr localsid address {} behavior end.m.gtp4.e ".format(pkts[0]["IPv6"].dst)
|
|
+ "v4src_position 64 fib-table 0"
|
|
)
|
|
self.logger.info(self.vapi.cli("show sr localsid"))
|
|
|
|
self.vapi.cli("clear errors")
|
|
|
|
self.pg0.add_stream(pkts)
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
|
|
self.logger.info(self.vapi.cli("show errors"))
|
|
self.logger.info(self.vapi.cli("show int address"))
|
|
|
|
capture = self.pg1.get_capture(len(pkts))
|
|
|
|
for pkt in capture:
|
|
self.logger.info(pkt.show2(dump=True))
|
|
self.assertEqual(pkt[IP].dst, self.ip4_dst)
|
|
self.assertEqual(pkt[IP].src, self.ip4_src)
|
|
self.assertEqual(pkt[GTP_U_Header].teid, 0xBBBBBBBB)
|
|
|
|
|
|
class TestSRv6TMGTP4D(VppTestCase):
|
|
"""SRv6 T.M.GTP4.D (GTP-U -> SRv6)"""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(TestSRv6TMGTP4D, cls).setUpClass()
|
|
try:
|
|
cls.create_pg_interfaces(range(2))
|
|
cls.pg_if_i = cls.pg_interfaces[0]
|
|
cls.pg_if_o = cls.pg_interfaces[1]
|
|
|
|
cls.pg_if_i.config_ip4()
|
|
cls.pg_if_i.config_ip6()
|
|
cls.pg_if_o.config_ip4()
|
|
cls.pg_if_o.config_ip6()
|
|
|
|
cls.ip4_dst = "1.1.1.1"
|
|
cls.ip4_src = "2.2.2.2"
|
|
|
|
cls.ip6_dst = cls.pg_if_o.remote_ip6
|
|
|
|
for pg_if in cls.pg_interfaces:
|
|
pg_if.admin_up()
|
|
pg_if.resolve_arp()
|
|
pg_if.resolve_ndp(timeout=5)
|
|
|
|
except Exception:
|
|
super(TestSRv6TMGTP4D, cls).tearDownClass()
|
|
raise
|
|
|
|
def create_packets(self, inner):
|
|
ip4_dst = IPv4Address(str(self.ip4_dst))
|
|
|
|
ip4_src = IPv4Address(str(self.ip4_src))
|
|
|
|
self.logger.info("ip4 dst: {}".format(ip4_dst))
|
|
self.logger.info("ip4 src: {}".format(ip4_src))
|
|
|
|
pkts = list()
|
|
for d, s in inner:
|
|
pkt = (
|
|
Ether()
|
|
/ IP(dst=str(ip4_dst), src=str(ip4_src))
|
|
/ UDP(sport=2152, dport=2152)
|
|
/ GTP_U_Header(gtp_type="g_pdu", teid=200)
|
|
/ IPv6(dst=d, src=s)
|
|
/ UDP(sport=1000, dport=23)
|
|
)
|
|
self.logger.info(pkt.show2(dump=True))
|
|
pkts.append(pkt)
|
|
|
|
return pkts
|
|
|
|
def test_srv6_mobile(self):
|
|
"""test_srv6_mobile"""
|
|
pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
|
|
|
|
self.vapi.cli("set sr encaps source addr A1::1")
|
|
self.vapi.cli("sr policy add bsid D4:: next D2:: next D3::")
|
|
self.vapi.cli(
|
|
"sr policy add bsid D5:: behavior t.m.gtp4.d D4::/32 "
|
|
+ "v6src_prefix C1::/64 nhtype ipv6 fib-table 0 drop-in"
|
|
)
|
|
self.vapi.cli("sr steer l3 {}/32 via bsid D5::".format(self.ip4_dst))
|
|
self.vapi.cli("ip route add D2::/32 via {}".format(self.ip6_dst))
|
|
|
|
self.logger.info(self.vapi.cli("show sr steer"))
|
|
self.logger.info(self.vapi.cli("show sr policies"))
|
|
|
|
self.vapi.cli("clear errors")
|
|
|
|
self.pg0.add_stream(pkts)
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
|
|
self.logger.info(self.vapi.cli("show errors"))
|
|
self.logger.info(self.vapi.cli("show int address"))
|
|
|
|
capture = self.pg1.get_capture(len(pkts))
|
|
|
|
for pkt in capture:
|
|
self.logger.info(pkt.show2(dump=True))
|
|
self.logger.info(
|
|
"GTP4.D Address={}".format(
|
|
str(pkt[IPv6ExtHdrSegmentRouting].addresses[0])
|
|
)
|
|
)
|
|
self.assertEqual(
|
|
str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]), "d4:0:101:101::c800:0"
|
|
)
|
|
|
|
|
|
class TestSRv6EndMGTP6E(VppTestCase):
|
|
"""SRv6 End.M.GTP6.E"""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(TestSRv6EndMGTP6E, cls).setUpClass()
|
|
try:
|
|
cls.create_pg_interfaces(range(2))
|
|
cls.pg_if_i = cls.pg_interfaces[0]
|
|
cls.pg_if_o = cls.pg_interfaces[1]
|
|
|
|
cls.pg_if_i.config_ip6()
|
|
cls.pg_if_o.config_ip6()
|
|
|
|
cls.ip6_nhop = cls.pg_if_o.remote_ip6
|
|
|
|
for pg_if in cls.pg_interfaces:
|
|
pg_if.admin_up()
|
|
pg_if.resolve_ndp(timeout=5)
|
|
|
|
except Exception:
|
|
super(TestSRv6EndMGTP6E, cls).tearDownClass()
|
|
raise
|
|
|
|
def create_packets(self, inner):
|
|
# 64bit prefix + 8bit QFI + 32bit TEID + 24bit
|
|
dst = b"\xaa" * 8 + b"\x00" + b"\xbb" * 4 + b"\x00" * 3
|
|
ip6_dst = IPv6Address(dst)
|
|
|
|
self.ip6_dst = ip6_dst
|
|
|
|
src = b"\xcc" * 8 + b"\xdd" * 4 + b"\x11" * 4
|
|
ip6_src = IPv6Address(src)
|
|
|
|
self.ip6_src = ip6_src
|
|
|
|
pkts = list()
|
|
for d, s in inner:
|
|
pkt = (
|
|
Ether()
|
|
/ IPv6(dst=str(ip6_dst), src=str(ip6_src))
|
|
/ IPv6ExtHdrSegmentRouting(
|
|
segleft=1, lastentry=0, tag=0, addresses=["a1::1"]
|
|
)
|
|
/ IPv6(dst=d, src=s)
|
|
/ UDP(sport=1000, dport=23)
|
|
)
|
|
self.logger.info(pkt.show2(dump=True))
|
|
pkts.append(pkt)
|
|
|
|
return pkts
|
|
|
|
def test_srv6_mobile(self):
|
|
"""test_srv6_mobile"""
|
|
pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
|
|
|
|
self.vapi.cli(
|
|
"sr localsid prefix {}/64 behavior end.m.gtp6.e fib-table 0".format(
|
|
pkts[0]["IPv6"].dst
|
|
)
|
|
)
|
|
self.vapi.cli("ip route add a1::/64 via {}".format(self.ip6_nhop))
|
|
self.logger.info(self.vapi.cli("show sr localsid"))
|
|
|
|
self.vapi.cli("clear errors")
|
|
|
|
self.pg0.add_stream(pkts)
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
|
|
self.logger.info(self.vapi.cli("show errors"))
|
|
self.logger.info(self.vapi.cli("show int address"))
|
|
|
|
capture = self.pg1.get_capture(len(pkts))
|
|
|
|
for pkt in capture:
|
|
self.logger.info(pkt.show2(dump=True))
|
|
self.assertEqual(pkt[IPv6].dst, "a1::1")
|
|
self.assertEqual(pkt[IPv6].src, str(self.ip6_src))
|
|
self.assertEqual(pkt[GTP_U_Header].teid, 0xBBBBBBBB)
|
|
|
|
|
|
class TestSRv6EndMGTP6D(VppTestCase):
|
|
"""SRv6 End.M.GTP6.D"""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(TestSRv6EndMGTP6D, cls).setUpClass()
|
|
try:
|
|
cls.create_pg_interfaces(range(2))
|
|
cls.pg_if_i = cls.pg_interfaces[0]
|
|
cls.pg_if_o = cls.pg_interfaces[1]
|
|
|
|
cls.pg_if_i.config_ip6()
|
|
cls.pg_if_o.config_ip6()
|
|
|
|
cls.ip6_nhop = cls.pg_if_o.remote_ip6
|
|
|
|
cls.ip6_dst = "2001::1"
|
|
cls.ip6_src = "2002::1"
|
|
|
|
for pg_if in cls.pg_interfaces:
|
|
pg_if.admin_up()
|
|
pg_if.resolve_ndp(timeout=5)
|
|
|
|
except Exception:
|
|
super(TestSRv6EndMGTP6D, cls).tearDownClass()
|
|
raise
|
|
|
|
def create_packets(self, inner):
|
|
ip6_dst = IPv6Address(str(self.ip6_dst))
|
|
|
|
ip6_src = IPv6Address(str(self.ip6_src))
|
|
|
|
self.logger.info("ip6 dst: {}".format(ip6_dst))
|
|
self.logger.info("ip6 src: {}".format(ip6_src))
|
|
|
|
pkts = list()
|
|
for d, s in inner:
|
|
pkt = (
|
|
Ether()
|
|
/ IPv6(dst=str(ip6_dst), src=str(ip6_src))
|
|
/ UDP(sport=2152, dport=2152)
|
|
/ GTP_U_Header(gtp_type="g_pdu", teid=200)
|
|
/ IPv6(dst=d, src=s)
|
|
/ UDP(sport=1000, dport=23)
|
|
)
|
|
self.logger.info(pkt.show2(dump=True))
|
|
pkts.append(pkt)
|
|
|
|
return pkts
|
|
|
|
def test_srv6_mobile(self):
|
|
"""test_srv6_mobile"""
|
|
pkts = self.create_packets([("A::1", "B::1"), ("C::1", "D::1")])
|
|
|
|
self.vapi.cli("set sr encaps source addr A1::1")
|
|
self.vapi.cli("sr policy add bsid D4:: next D2:: next D3::")
|
|
self.vapi.cli(
|
|
"sr localsid prefix 2001::/64 behavior end.m.gtp6.d "
|
|
+ "D4::/64 fib-table 0 drop-in"
|
|
)
|
|
self.vapi.cli("ip route add D2::/64 via {}".format(self.ip6_nhop))
|
|
|
|
self.logger.info(self.vapi.cli("show sr policies"))
|
|
self.logger.info(self.vapi.cli("show sr localsid"))
|
|
|
|
self.vapi.cli("clear errors")
|
|
|
|
self.pg0.add_stream(pkts)
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
|
|
self.logger.info(self.vapi.cli("show errors"))
|
|
self.logger.info(self.vapi.cli("show int address"))
|
|
|
|
capture = self.pg1.get_capture(len(pkts))
|
|
|
|
for pkt in capture:
|
|
self.logger.info(pkt.show2(dump=True))
|
|
self.logger.info(
|
|
"GTP6.D SID0={}".format(str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]))
|
|
)
|
|
self.logger.info(
|
|
"GTP6.D SID1={}".format(str(pkt[IPv6ExtHdrSegmentRouting].addresses[1]))
|
|
)
|
|
self.assertEqual(str(pkt[IPv6ExtHdrSegmentRouting].addresses[0]), "2001::1")
|
|
self.assertEqual(
|
|
str(pkt[IPv6ExtHdrSegmentRouting].addresses[1]), "d4::c800:0"
|
|
)
|