vpp/test/test_srmpls.py
Klement Sekera d9b0c6fbf7 tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is
much faster, stable PEP8 compliant code style checker offering also
automatic formatting. It aims to be very stable and produce smallest
diffs. It's used by many small and big projects.

Running checkstyle with black takes a few seconds with a terse output.
Thus, test-checkstyle-diff is no longer necessary.

Expand scope of checkstyle to all python files in the repo, replacing
test-checkstyle with checkstyle-python.

Also, fixstyle-python is now available for automatic style formatting.

Note: python virtualenv has been consolidated in test/Makefile,
test/requirements*.txt which will eventually be moved to a central
location.  This is required to simply the automated generation of
docker executor images in the CI.

Type: improvement
Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8
Signed-off-by: Klement Sekera <klement.sekera@gmail.com>
Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
2022-05-10 18:52:08 +00:00

316 lines
8.9 KiB
Python

#!/usr/bin/env python3
import unittest
import socket
from framework import VppTestCase, VppTestRunner
from vpp_ip import DpoProto
from vpp_ip_route import (
VppIpRoute,
VppRoutePath,
VppMplsRoute,
VppIpTable,
VppMplsTable,
VppMplsLabel,
)
from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, ICMP
from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
from scapy.contrib.mpls import MPLS
def verify_filter(capture, sent):
if not len(capture) == len(sent):
# filter out any IPv6 RAs from the capture
for p in capture:
if p.haslayer(IPv6):
capture.remove(p)
return capture
def verify_mpls_stack(tst, rx, mpls_labels):
# the rx'd packet has the MPLS label popped
eth = rx[Ether]
tst.assertEqual(eth.type, 0x8847)
rx_mpls = rx[MPLS]
for ii in range(len(mpls_labels)):
tst.assertEqual(rx_mpls.label, mpls_labels[ii].value)
tst.assertEqual(rx_mpls.cos, mpls_labels[ii].exp)
tst.assertEqual(rx_mpls.ttl, mpls_labels[ii].ttl)
if ii == len(mpls_labels) - 1:
tst.assertEqual(rx_mpls.s, 1)
else:
# not end of stack
tst.assertEqual(rx_mpls.s, 0)
# pop the label to expose the next
rx_mpls = rx_mpls[MPLS].payload
class TestSRMPLS(VppTestCase):
"""SR-MPLS Test Case"""
@classmethod
def setUpClass(cls):
super(TestSRMPLS, cls).setUpClass()
@classmethod
def tearDownClass(cls):
super(TestSRMPLS, cls).tearDownClass()
def setUp(self):
super(TestSRMPLS, self).setUp()
# create 2 pg interfaces
self.create_pg_interfaces(range(4))
# setup both interfaces
# assign them different tables.
table_id = 0
self.tables = []
tbl = VppMplsTable(self, 0)
tbl.add_vpp_config()
self.tables.append(tbl)
for i in self.pg_interfaces:
i.admin_up()
i.config_ip4()
i.resolve_arp()
i.config_ip6()
i.resolve_ndp()
i.enable_mpls()
def tearDown(self):
for i in self.pg_interfaces:
i.unconfig_ip4()
i.unconfig_ip6()
i.disable_mpls()
i.admin_down()
super(TestSRMPLS, self).tearDown()
def create_stream_ip4(self, src_if, dst_ip, ip_ttl=64, ip_dscp=0):
self.reset_packet_infos()
pkts = []
for i in range(0, 257):
info = self.create_packet_info(src_if, src_if)
payload = self.info_to_payload(info)
p = (
Ether(dst=src_if.local_mac, src=src_if.remote_mac)
/ IP(src=src_if.remote_ip4, dst=dst_ip, ttl=ip_ttl, tos=ip_dscp)
/ UDP(sport=1234, dport=1234)
/ Raw(payload)
)
info.data = p.copy()
pkts.append(p)
return pkts
def verify_capture_labelled_ip4(
self, src_if, capture, sent, mpls_labels, ip_ttl=None
):
try:
capture = verify_filter(capture, sent)
self.assertEqual(len(capture), len(sent))
for i in range(len(capture)):
tx = sent[i]
rx = capture[i]
tx_ip = tx[IP]
rx_ip = rx[IP]
verify_mpls_stack(self, rx, mpls_labels)
self.assertEqual(rx_ip.src, tx_ip.src)
self.assertEqual(rx_ip.dst, tx_ip.dst)
if not ip_ttl:
# IP processing post pop has decremented the TTL
self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
else:
self.assertEqual(rx_ip.ttl, ip_ttl)
except:
raise
def verify_capture_tunneled_ip4(self, src_if, capture, sent, mpls_labels):
try:
capture = verify_filter(capture, sent)
self.assertEqual(len(capture), len(sent))
for i in range(len(capture)):
tx = sent[i]
rx = capture[i]
tx_ip = tx[IP]
rx_ip = rx[IP]
verify_mpls_stack(self, rx, mpls_labels)
self.assertEqual(rx_ip.src, tx_ip.src)
self.assertEqual(rx_ip.dst, tx_ip.dst)
# IP processing post pop has decremented the TTL
self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
except:
raise
def test_sr_mpls(self):
"""SR MPLS"""
#
# A simple MPLS xconnect - neos label in label out
#
route_32_eos = VppMplsRoute(
self,
32,
0,
[
VppRoutePath(
self.pg0.remote_ip4, self.pg0.sw_if_index, labels=[VppMplsLabel(32)]
)
],
)
route_32_eos.add_vpp_config()
#
# A binding SID with only one label
#
self.vapi.sr_mpls_policy_add(999, 1, 0, [32])
#
# A labeled IP route that resolves thru the binding SID
#
ip_10_0_0_1 = VppIpRoute(
self,
"10.0.0.1",
32,
[
VppRoutePath(
"0.0.0.0", 0xFFFFFFFF, nh_via_label=999, labels=[VppMplsLabel(55)]
)
],
)
ip_10_0_0_1.add_vpp_config()
tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
rx = self.send_and_expect(self.pg1, tx, self.pg0)
self.verify_capture_labelled_ip4(
self.pg0, rx, tx, [VppMplsLabel(32), VppMplsLabel(55)]
)
#
# An unlabeled IP route that resolves thru the binding SID
#
ip_10_0_0_1 = VppIpRoute(
self,
"10.0.0.2",
32,
[VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_via_label=999)],
)
ip_10_0_0_1.add_vpp_config()
tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
rx = self.send_and_expect(self.pg1, tx, self.pg0)
self.verify_capture_labelled_ip4(self.pg0, rx, tx, [VppMplsLabel(32)])
self.vapi.sr_mpls_policy_del(999)
#
# this time the SID has many labels pushed
#
self.vapi.sr_mpls_policy_add(999, 1, 0, [32, 33, 34])
tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
rx = self.send_and_expect(self.pg1, tx, self.pg0)
self.verify_capture_labelled_ip4(
self.pg0,
rx,
tx,
[VppMplsLabel(32), VppMplsLabel(33), VppMplsLabel(34), VppMplsLabel(55)],
)
tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
rx = self.send_and_expect(self.pg1, tx, self.pg0)
self.verify_capture_labelled_ip4(
self.pg0, rx, tx, [VppMplsLabel(32), VppMplsLabel(33), VppMplsLabel(34)]
)
#
# Resolve an MPLS tunnel via the SID
#
mpls_tun = VppMPLSTunnelInterface(
self,
[
VppRoutePath(
"0.0.0.0",
0xFFFFFFFF,
nh_via_label=999,
labels=[VppMplsLabel(44), VppMplsLabel(46)],
)
],
)
mpls_tun.add_vpp_config()
mpls_tun.admin_up()
#
# add an unlabelled route through the new tunnel
#
route_10_0_0_3 = VppIpRoute(
self, "10.0.0.3", 32, [VppRoutePath("0.0.0.0", mpls_tun._sw_if_index)]
)
route_10_0_0_3.add_vpp_config()
self.logger.info(self.vapi.cli("sh mpls tun 0"))
self.logger.info(self.vapi.cli("sh adj 21"))
tx = self.create_stream_ip4(self.pg1, "10.0.0.3")
rx = self.send_and_expect(self.pg1, tx, self.pg0)
self.verify_capture_tunneled_ip4(
self.pg0,
rx,
tx,
[
VppMplsLabel(32),
VppMplsLabel(33),
VppMplsLabel(34),
VppMplsLabel(44),
VppMplsLabel(46),
],
)
#
# add a labelled route through the new tunnel
#
route_10_0_0_3 = VppIpRoute(
self,
"10.0.0.4",
32,
[VppRoutePath("0.0.0.0", mpls_tun._sw_if_index, labels=[VppMplsLabel(55)])],
)
route_10_0_0_3.add_vpp_config()
tx = self.create_stream_ip4(self.pg1, "10.0.0.4")
rx = self.send_and_expect(self.pg1, tx, self.pg0)
self.verify_capture_tunneled_ip4(
self.pg0,
rx,
tx,
[
VppMplsLabel(32),
VppMplsLabel(33),
VppMplsLabel(34),
VppMplsLabel(44),
VppMplsLabel(46),
VppMplsLabel(55),
],
)
self.vapi.sr_mpls_policy_del(999)
if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)