d9b0c6fbf7
Drop pycodestyle for code style checking in favor of black. Black is much faster, stable PEP8 compliant code style checker offering also automatic formatting. It aims to be very stable and produce smallest diffs. It's used by many small and big projects. Running checkstyle with black takes a few seconds with a terse output. Thus, test-checkstyle-diff is no longer necessary. Expand scope of checkstyle to all python files in the repo, replacing test-checkstyle with checkstyle-python. Also, fixstyle-python is now available for automatic style formatting. Note: python virtualenv has been consolidated in test/Makefile, test/requirements*.txt which will eventually be moved to a central location. This is required to simply the automated generation of docker executor images in the CI. Type: improvement Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8 Signed-off-by: Klement Sekera <klement.sekera@gmail.com> Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
585 lines
17 KiB
Python
585 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
import random
|
|
import unittest
|
|
import datetime
|
|
import re
|
|
|
|
from scapy.packet import Raw
|
|
from scapy.layers.l2 import Ether
|
|
from scapy.layers.inet import IP, UDP
|
|
from scapy.layers.inet6 import IPv6
|
|
|
|
from framework import VppTestCase, VppTestRunner
|
|
from vpp_sub_interface import VppP2PSubint
|
|
from vpp_ip import DpoProto
|
|
from vpp_ip_route import VppIpRoute, VppRoutePath
|
|
from vpp_papi import mac_pton
|
|
|
|
|
|
class P2PEthernetAPI(VppTestCase):
|
|
"""P2P Ethernet tests"""
|
|
|
|
p2p_sub_ifs = []
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(P2PEthernetAPI, cls).setUpClass()
|
|
|
|
# Create pg interfaces
|
|
cls.create_pg_interfaces(range(4))
|
|
|
|
# Set up all interfaces
|
|
for i in cls.pg_interfaces:
|
|
i.admin_up()
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
super(P2PEthernetAPI, cls).tearDownClass()
|
|
|
|
def create_p2p_ethernet(self, parent_if, sub_id, remote_mac):
|
|
p2p = VppP2PSubint(self, parent_if, sub_id, mac_pton(remote_mac))
|
|
self.p2p_sub_ifs.append(p2p)
|
|
|
|
def delete_p2p_ethernet(self, parent_if, remote_mac):
|
|
self.vapi.p2p_ethernet_del(parent_if.sw_if_index, mac_pton(remote_mac))
|
|
|
|
def test_api(self):
|
|
"""delete/create p2p subif"""
|
|
self.logger.info("FFP_TEST_START_0000")
|
|
|
|
self.create_p2p_ethernet(self.pg0, 1, "de:ad:00:00:00:01")
|
|
self.create_p2p_ethernet(self.pg0, 2, "de:ad:00:00:00:02")
|
|
intfs = self.vapi.cli("show interface")
|
|
|
|
self.assertIn("pg0.1", intfs)
|
|
self.assertIn("pg0.2", intfs)
|
|
self.assertNotIn("pg0.5", intfs)
|
|
|
|
# create pg2.5 subif
|
|
self.create_p2p_ethernet(self.pg0, 5, "de:ad:00:00:00:ff")
|
|
intfs = self.vapi.cli("show interface")
|
|
self.assertIn("pg0.5", intfs)
|
|
# delete pg2.5 subif
|
|
self.delete_p2p_ethernet(self.pg0, "de:ad:00:00:00:ff")
|
|
|
|
intfs = self.vapi.cli("show interface")
|
|
|
|
self.assertIn("pg0.1", intfs)
|
|
self.assertIn("pg0.2", intfs)
|
|
self.assertNotIn("pg0.5", intfs)
|
|
|
|
self.logger.info("FFP_TEST_FINISH_0000")
|
|
|
|
def test_p2p_subif_creation_1k(self):
|
|
"""create 1k of p2p subifs"""
|
|
self.logger.info("FFP_TEST_START_0001")
|
|
|
|
macs = []
|
|
clients = 1000
|
|
mac = int("dead00000000", 16)
|
|
|
|
for i in range(1, clients + 1):
|
|
try:
|
|
macs.append(":".join(re.findall("..", "{:02x}".format(mac + i))))
|
|
self.vapi.p2p_ethernet_add(
|
|
self.pg2.sw_if_index, mac_pton(macs[i - 1]), i
|
|
)
|
|
except Exception:
|
|
self.logger.info("Failed to create subif %d %s" % (i, macs[i - 1]))
|
|
raise
|
|
|
|
intfs = self.vapi.cli("show interface").split("\n")
|
|
count = 0
|
|
for intf in intfs:
|
|
if intf.startswith("pg2."):
|
|
count += 1
|
|
self.assertEqual(count, clients)
|
|
|
|
self.logger.info("FFP_TEST_FINISH_0001")
|
|
|
|
|
|
class P2PEthernetIPV6(VppTestCase):
|
|
"""P2P Ethernet IPv6 tests"""
|
|
|
|
p2p_sub_ifs = []
|
|
packets = []
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(P2PEthernetIPV6, cls).setUpClass()
|
|
|
|
# Create pg interfaces
|
|
cls.create_pg_interfaces(range(3))
|
|
|
|
# Packet sizes
|
|
cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
|
|
|
|
# Set up all interfaces
|
|
for i in cls.pg_interfaces:
|
|
i.admin_up()
|
|
|
|
cls.pg0.generate_remote_hosts(3)
|
|
cls.pg0.configure_ipv6_neighbors()
|
|
|
|
cls.pg1.config_ip6()
|
|
cls.pg1.generate_remote_hosts(3)
|
|
cls.pg1.configure_ipv6_neighbors()
|
|
cls.pg1.disable_ipv6_ra()
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
super(P2PEthernetIPV6, cls).tearDownClass()
|
|
|
|
def setUp(self):
|
|
super(P2PEthernetIPV6, self).setUp()
|
|
for p in self.packets:
|
|
self.packets.remove(p)
|
|
self.p2p_sub_ifs.append(
|
|
self.create_p2p_ethernet(self.pg0, 1, self.pg0._remote_hosts[0].mac)
|
|
)
|
|
self.p2p_sub_ifs.append(
|
|
self.create_p2p_ethernet(self.pg0, 2, self.pg0._remote_hosts[1].mac)
|
|
)
|
|
self.vapi.cli("trace add p2p-ethernet-input 50")
|
|
|
|
def tearDown(self):
|
|
while len(self.p2p_sub_ifs):
|
|
p2p = self.p2p_sub_ifs.pop()
|
|
self.delete_p2p_ethernet(p2p)
|
|
|
|
super(P2PEthernetIPV6, self).tearDown()
|
|
|
|
def create_p2p_ethernet(self, parent_if, sub_id, remote_mac):
|
|
p2p = VppP2PSubint(self, parent_if, sub_id, mac_pton(remote_mac))
|
|
p2p.admin_up()
|
|
p2p.config_ip6()
|
|
p2p.disable_ipv6_ra()
|
|
return p2p
|
|
|
|
def delete_p2p_ethernet(self, p2p):
|
|
p2p.unconfig_ip6()
|
|
p2p.admin_down()
|
|
self.vapi.p2p_ethernet_del(p2p.parent.sw_if_index, p2p.p2p_remote_mac)
|
|
|
|
def create_stream(
|
|
self, src_mac=None, dst_mac=None, src_ip=None, dst_ip=None, size=None
|
|
):
|
|
pkt_size = size
|
|
if size is None:
|
|
pkt_size = random.choice(self.pg_if_packet_sizes)
|
|
p = Ether(src=src_mac, dst=dst_mac)
|
|
p /= IPv6(src=src_ip, dst=dst_ip)
|
|
p /= UDP(sport=1234, dport=4321) / Raw(b"\xa5" * 20)
|
|
self.extend_packet(p, pkt_size)
|
|
return p
|
|
|
|
def send_packets(self, src_if=None, dst_if=None, packets=None, count=None):
|
|
self.pg_enable_capture([dst_if])
|
|
if packets is None:
|
|
packets = self.packets
|
|
src_if.add_stream(packets)
|
|
self.pg_start()
|
|
if count is None:
|
|
count = len(packets)
|
|
return dst_if.get_capture(count)
|
|
|
|
def test_no_p2p_subif(self):
|
|
"""standard routing without p2p subinterfaces"""
|
|
self.logger.info("FFP_TEST_START_0001")
|
|
|
|
self.pg0.config_ip6()
|
|
route_8000 = VppIpRoute(
|
|
self,
|
|
"8000::",
|
|
64,
|
|
[VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index)],
|
|
)
|
|
route_8000.add_vpp_config()
|
|
|
|
self.packets = [
|
|
(
|
|
Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
|
|
/ IPv6(src="3001::1", dst="8000::100")
|
|
/ UDP(sport=1234, dport=1234)
|
|
/ Raw(b"\xa5" * 100)
|
|
)
|
|
]
|
|
self.send_packets(self.pg1, self.pg0)
|
|
|
|
self.pg0.unconfig_ip6()
|
|
self.logger.info("FFP_TEST_FINISH_0001")
|
|
|
|
def test_ip6_rx_p2p_subif(self):
|
|
"""receive ipv6 packet via p2p subinterface"""
|
|
self.logger.info("FFP_TEST_START_0002")
|
|
|
|
route_9001 = VppIpRoute(
|
|
self,
|
|
"9001::",
|
|
64,
|
|
[VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
|
|
)
|
|
route_9001.add_vpp_config()
|
|
|
|
self.packets.append(
|
|
self.create_stream(
|
|
src_mac=self.pg0._remote_hosts[0].mac,
|
|
dst_mac=self.pg0.local_mac,
|
|
src_ip=self.p2p_sub_ifs[0].remote_ip6,
|
|
dst_ip="9001::100",
|
|
)
|
|
)
|
|
|
|
self.send_packets(self.pg0, self.pg1, self.packets)
|
|
self.assert_packet_counter_equal("p2p-ethernet-input", 1)
|
|
|
|
route_9001.remove_vpp_config()
|
|
self.logger.info("FFP_TEST_FINISH_0002")
|
|
|
|
def test_ip6_rx_p2p_subif_route(self):
|
|
"""route rx ip6 packet not matching p2p subinterface"""
|
|
self.logger.info("FFP_TEST_START_0003")
|
|
|
|
self.pg0.config_ip6()
|
|
|
|
route_3 = VppIpRoute(
|
|
self,
|
|
"9000::",
|
|
64,
|
|
[VppRoutePath(self.pg1._remote_hosts[0].ip6, self.pg1.sw_if_index)],
|
|
)
|
|
route_3.add_vpp_config()
|
|
|
|
self.packets.append(
|
|
self.create_stream(
|
|
src_mac="02:03:00:00:ff:ff",
|
|
dst_mac=self.pg0.local_mac,
|
|
src_ip="a000::100",
|
|
dst_ip="9000::100",
|
|
)
|
|
)
|
|
|
|
self.send_packets(self.pg0, self.pg1)
|
|
|
|
self.pg0.unconfig_ip6()
|
|
|
|
route_3.remove_vpp_config()
|
|
|
|
self.logger.info("FFP_TEST_FINISH_0003")
|
|
|
|
def test_ip6_rx_p2p_subif_drop(self):
|
|
"""drop rx packet not matching p2p subinterface"""
|
|
self.logger.info("FFP_TEST_START_0004")
|
|
|
|
route_9001 = VppIpRoute(
|
|
self,
|
|
"9000::",
|
|
64,
|
|
[VppRoutePath(self.pg1._remote_hosts[0].ip6, self.pg1.sw_if_index)],
|
|
)
|
|
route_9001.add_vpp_config()
|
|
|
|
self.packets.append(
|
|
self.create_stream(
|
|
src_mac="02:03:00:00:ff:ff",
|
|
dst_mac=self.pg0.local_mac,
|
|
src_ip="a000::100",
|
|
dst_ip="9000::100",
|
|
)
|
|
)
|
|
|
|
# no packet received
|
|
self.send_packets(self.pg0, self.pg1, count=0)
|
|
self.logger.info("FFP_TEST_FINISH_0004")
|
|
|
|
def test_ip6_tx_p2p_subif(self):
|
|
"""send packet via p2p subinterface"""
|
|
self.logger.info("FFP_TEST_START_0005")
|
|
|
|
self.pg0.config_ip6()
|
|
|
|
route_8000 = VppIpRoute(
|
|
self,
|
|
"8000::",
|
|
64,
|
|
[VppRoutePath(self.pg0.remote_hosts[0].ip6, self.pg0.sw_if_index)],
|
|
)
|
|
route_8000.add_vpp_config()
|
|
route_8001 = VppIpRoute(
|
|
self,
|
|
"8001::",
|
|
64,
|
|
[
|
|
VppRoutePath(
|
|
self.p2p_sub_ifs[0].remote_ip6, self.p2p_sub_ifs[0].sw_if_index
|
|
)
|
|
],
|
|
)
|
|
route_8001.add_vpp_config()
|
|
route_8002 = VppIpRoute(
|
|
self,
|
|
"8002::",
|
|
64,
|
|
[
|
|
VppRoutePath(
|
|
self.p2p_sub_ifs[1].remote_ip6, self.p2p_sub_ifs[1].sw_if_index
|
|
)
|
|
],
|
|
)
|
|
route_8002.add_vpp_config()
|
|
|
|
for i in range(0, 3):
|
|
self.packets.append(
|
|
self.create_stream(
|
|
src_mac=self.pg1.remote_mac,
|
|
dst_mac=self.pg1.local_mac,
|
|
src_ip=self.pg1.remote_ip6,
|
|
dst_ip="800%d::100" % i,
|
|
)
|
|
)
|
|
|
|
self.send_packets(self.pg1, self.pg0, count=3)
|
|
|
|
route_8000.remove_vpp_config()
|
|
route_8001.remove_vpp_config()
|
|
route_8002.remove_vpp_config()
|
|
|
|
self.pg0.unconfig_ip6()
|
|
self.logger.info("FFP_TEST_FINISH_0005")
|
|
|
|
def test_ip6_tx_p2p_subif_drop(self):
|
|
"""drop tx ip6 packet not matching p2p subinterface"""
|
|
self.logger.info("FFP_TEST_START_0006")
|
|
|
|
self.packets.append(
|
|
self.create_stream(
|
|
src_mac="02:03:00:00:ff:ff",
|
|
dst_mac=self.pg0.local_mac,
|
|
src_ip="a000::100",
|
|
dst_ip="9000::100",
|
|
)
|
|
)
|
|
|
|
# no packet received
|
|
self.send_packets(self.pg0, self.pg1, count=0)
|
|
self.logger.info("FFP_TEST_FINISH_0006")
|
|
|
|
|
|
class P2PEthernetIPV4(VppTestCase):
|
|
"""P2P Ethernet IPv4 tests"""
|
|
|
|
p2p_sub_ifs = []
|
|
packets = []
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(P2PEthernetIPV4, cls).setUpClass()
|
|
|
|
# Create pg interfaces
|
|
cls.create_pg_interfaces(range(3))
|
|
|
|
# Packet sizes
|
|
cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
|
|
|
|
# Set up all interfaces
|
|
for i in cls.pg_interfaces:
|
|
i.admin_up()
|
|
|
|
cls.pg0.config_ip4()
|
|
cls.pg0.generate_remote_hosts(5)
|
|
cls.pg0.configure_ipv4_neighbors()
|
|
|
|
cls.pg1.config_ip4()
|
|
cls.pg1.generate_remote_hosts(5)
|
|
cls.pg1.configure_ipv4_neighbors()
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
super(P2PEthernetIPV4, cls).tearDownClass()
|
|
|
|
def setUp(self):
|
|
super(P2PEthernetIPV4, self).setUp()
|
|
for p in self.packets:
|
|
self.packets.remove(p)
|
|
self.p2p_sub_ifs.append(
|
|
self.create_p2p_ethernet(self.pg0, 1, self.pg0._remote_hosts[0].mac)
|
|
)
|
|
self.p2p_sub_ifs.append(
|
|
self.create_p2p_ethernet(self.pg0, 2, self.pg0._remote_hosts[1].mac)
|
|
)
|
|
self.vapi.cli("trace add p2p-ethernet-input 50")
|
|
|
|
def tearDown(self):
|
|
while len(self.p2p_sub_ifs):
|
|
p2p = self.p2p_sub_ifs.pop()
|
|
self.delete_p2p_ethernet(p2p)
|
|
super(P2PEthernetIPV4, self).tearDown()
|
|
|
|
def create_stream(
|
|
self, src_mac=None, dst_mac=None, src_ip=None, dst_ip=None, size=None
|
|
):
|
|
pkt_size = size
|
|
if size is None:
|
|
pkt_size = random.choice(self.pg_if_packet_sizes)
|
|
p = Ether(src=src_mac, dst=dst_mac)
|
|
p /= IP(src=src_ip, dst=dst_ip)
|
|
p /= UDP(sport=1234, dport=4321) / Raw(b"\xa5" * 20)
|
|
self.extend_packet(p, pkt_size)
|
|
return p
|
|
|
|
def send_packets(self, src_if=None, dst_if=None, packets=None, count=None):
|
|
self.pg_enable_capture([dst_if])
|
|
if packets is None:
|
|
packets = self.packets
|
|
src_if.add_stream(packets)
|
|
self.pg_start()
|
|
if count is None:
|
|
count = len(packets)
|
|
return dst_if.get_capture(count)
|
|
|
|
def create_p2p_ethernet(self, parent_if, sub_id, remote_mac):
|
|
p2p = VppP2PSubint(self, parent_if, sub_id, mac_pton(remote_mac))
|
|
p2p.admin_up()
|
|
p2p.config_ip4()
|
|
return p2p
|
|
|
|
def delete_p2p_ethernet(self, p2p):
|
|
p2p.unconfig_ip4()
|
|
p2p.admin_down()
|
|
self.vapi.p2p_ethernet_del(p2p.parent.sw_if_index, p2p.p2p_remote_mac)
|
|
|
|
def test_ip4_rx_p2p_subif(self):
|
|
"""receive ipv4 packet via p2p subinterface"""
|
|
self.logger.info("FFP_TEST_START_0002")
|
|
|
|
route_9000 = VppIpRoute(
|
|
self,
|
|
"9.0.0.0",
|
|
16,
|
|
[VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)],
|
|
)
|
|
route_9000.add_vpp_config()
|
|
|
|
self.packets.append(
|
|
self.create_stream(
|
|
src_mac=self.pg0._remote_hosts[0].mac,
|
|
dst_mac=self.pg0.local_mac,
|
|
src_ip=self.p2p_sub_ifs[0].remote_ip4,
|
|
dst_ip="9.0.0.100",
|
|
)
|
|
)
|
|
|
|
self.send_packets(self.pg0, self.pg1, self.packets)
|
|
|
|
self.assert_packet_counter_equal("p2p-ethernet-input", 1)
|
|
|
|
route_9000.remove_vpp_config()
|
|
self.logger.info("FFP_TEST_FINISH_0002")
|
|
|
|
def test_ip4_rx_p2p_subif_route(self):
|
|
"""route rx packet not matching p2p subinterface"""
|
|
self.logger.info("FFP_TEST_START_0003")
|
|
|
|
route_9001 = VppIpRoute(
|
|
self,
|
|
"9.0.0.0",
|
|
24,
|
|
[VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)],
|
|
)
|
|
route_9001.add_vpp_config()
|
|
|
|
self.packets.append(
|
|
self.create_stream(
|
|
src_mac="02:01:00:00:ff:ff",
|
|
dst_mac=self.pg0.local_mac,
|
|
src_ip="8.0.0.100",
|
|
dst_ip="9.0.0.100",
|
|
)
|
|
)
|
|
|
|
self.send_packets(self.pg0, self.pg1)
|
|
|
|
route_9001.remove_vpp_config()
|
|
|
|
self.logger.info("FFP_TEST_FINISH_0003")
|
|
|
|
def test_ip4_tx_p2p_subif(self):
|
|
"""send ip4 packet via p2p subinterface"""
|
|
self.logger.info("FFP_TEST_START_0005")
|
|
|
|
route_9100 = VppIpRoute(
|
|
self,
|
|
"9.1.0.100",
|
|
24,
|
|
[
|
|
VppRoutePath(
|
|
self.pg0.remote_ip4,
|
|
self.pg0.sw_if_index,
|
|
)
|
|
],
|
|
)
|
|
route_9100.add_vpp_config()
|
|
route_9200 = VppIpRoute(
|
|
self,
|
|
"9.2.0.100",
|
|
24,
|
|
[
|
|
VppRoutePath(
|
|
self.p2p_sub_ifs[0].remote_ip4,
|
|
self.p2p_sub_ifs[0].sw_if_index,
|
|
)
|
|
],
|
|
)
|
|
route_9200.add_vpp_config()
|
|
route_9300 = VppIpRoute(
|
|
self,
|
|
"9.3.0.100",
|
|
24,
|
|
[
|
|
VppRoutePath(
|
|
self.p2p_sub_ifs[1].remote_ip4, self.p2p_sub_ifs[1].sw_if_index
|
|
)
|
|
],
|
|
)
|
|
route_9300.add_vpp_config()
|
|
|
|
for i in range(0, 3):
|
|
self.packets.append(
|
|
self.create_stream(
|
|
src_mac=self.pg1.remote_mac,
|
|
dst_mac=self.pg1.local_mac,
|
|
src_ip=self.pg1.remote_ip4,
|
|
dst_ip="9.%d.0.100" % (i + 1),
|
|
)
|
|
)
|
|
|
|
self.send_packets(self.pg1, self.pg0)
|
|
|
|
# route_7000.remove_vpp_config()
|
|
route_9100.remove_vpp_config()
|
|
route_9200.remove_vpp_config()
|
|
route_9300.remove_vpp_config()
|
|
|
|
self.logger.info("FFP_TEST_FINISH_0005")
|
|
|
|
def test_ip4_tx_p2p_subif_drop(self):
|
|
"""drop tx ip4 packet not matching p2p subinterface"""
|
|
self.logger.info("FFP_TEST_START_0006")
|
|
|
|
self.packets.append(
|
|
self.create_stream(
|
|
src_mac="02:01:00:00:ff:ff",
|
|
dst_mac=self.pg0.local_mac,
|
|
src_ip="8.0.0.100",
|
|
dst_ip="9.0.0.100",
|
|
)
|
|
)
|
|
|
|
# no packet received
|
|
self.send_packets(self.pg0, self.pg1, count=0)
|
|
self.logger.info("FFP_TEST_FINISH_0006")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main(testRunner=VppTestRunner)
|