diff --git a/test/test_nsim.py b/test/test_nsim.py new file mode 100644 index 00000000000..16b5fd07882 --- /dev/null +++ b/test/test_nsim.py @@ -0,0 +1,267 @@ +from framework import VppTestCase +from asfframework import VppTestRunner +from config import config +import unittest +import re + +from scapy.layers.l2 import Ether +from scapy.layers.inet import IP, UDP +from scapy.packet import Raw +from random import randint +from util import ppp + + +def create_stream(self, src_if, dst_if, count): + packets = [] + for i in range(count): + # create packet info stored in the test case instance + info = self.create_packet_info(src_if, dst_if) + # convert the info into packet payload + payload = self.info_to_payload(info) + # create the packet itself + p = ( + Ether(dst=src_if.local_mac, src=src_if.remote_mac) + / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) + / UDP(sport=randint(49152, 65535), dport=5678) + / Raw(payload) + ) + # store a copy of the packet in the packet info + info.data = p.copy() + # append the packet to the list + packets.append(p) + + # return the created packet list + return packets + + +def verify_capture(self, src_if, dst_if, capture, reply): + packet_info = None + for packet in capture: + try: + ip = packet[IP] + udp = packet[UDP] + # convert the payload to packet info object + payload_info = self.payload_to_info(packet[Raw]) + # make sure the indexes match + self.assert_equal( + payload_info.src, src_if.sw_if_index, "source sw_if_index" + ) + self.assert_equal( + payload_info.dst, dst_if.sw_if_index, "destination sw_if_index" + ) + packet_info = self.get_next_packet_info_for_interface2( + src_if.sw_if_index, dst_if.sw_if_index, packet_info + ) + # make sure we didn't run out of saved packets + self.assertIsNotNone(packet_info) + self.assert_equal( + payload_info.index, packet_info.index, "packet info index" + ) + saved_packet = packet_info.data # fetch the saved packet + # assert the values match + self.assert_equal(ip.src, saved_packet[IP].src, "IP source address") + # ... more assertions here + self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port") + except Exception: + self.logger.error(ppp("Unexpected or invalid packet:", packet)) + raise + remaining_packet = self.get_next_packet_info_for_interface2( + src_if.sw_if_index, dst_if.sw_if_index, packet_info + ) + self.assertIsNone( + remaining_packet, + "Interface %s: Packet expected from interface " + "%s didn't arrive" % (dst_if.name, src_if.name), + ) + + # find timestamps and get actual delay + pattern = r"\d{2}:\d{2}:\d{2}:\d{6}" + timestamps = re.findall(pattern, reply) + actual_delay = int(timestamps[2][9:]) - int(timestamps[0][9:]) + self.assertTrue( + actual_delay >= 100000, f"Delay is lower than expected: {actual_delay} < 100000" + ) + + +@unittest.skipIf("nsim" in config.excluded_plugins, "Exclude NSIM plugin tests") +class TestNsimCli(VppTestCase): + """NSIM plugin tests [CLI]""" + + @classmethod + def setUpClass(cls): + super(TestNsimCli, cls).setUpClass() + try: + cls.create_pg_interfaces(range(2)) + for i in cls.pg_interfaces: + i.config_ip4() + i.resolve_arp() + i.admin_up() + except Exception: + cls.tearDownClass() + raise + + @classmethod + def tearDownClass(cls): + cls.vapi.cli("nsim cross-connect enable-disable pg0 pg1 disable") + cls.vapi.cli("nsim output-feature enable-disable pg0 disable") + for i in cls.pg_interfaces: + i.unconfig_ip4() + i.admin_down() + super(TestNsimCli, cls).tearDownClass() + + def test_nsim_delay(self): + """Add 100ms delay""" + packets = create_stream(self, self.pg0, self.pg1, 5) + self.pg0.add_stream(packets) + self.pg0.enable_capture() + self.pg1.enable_capture() + + self.vapi.cli( + "set nsim delay 100.0 ms bandwidth 1 gbit packet-size 128 drop-fraction 0.0" + ) + self.vapi.cli("nsim cross-connect enable-disable pg0 pg1") + self.vapi.cli("nsim output-feature enable-disable pg0") + + self.pg_start() + capture = self.pg1.get_capture() + self.pg0.assert_nothing_captured() + reply = self.vapi.cli("show trace") + verify_capture(self, self.pg0, self.pg1, capture, reply) + self.assertIn("nsim", reply) + reply = self.vapi.cli("show nsim") + self.assertIn("delay: 100.0 ms", reply) + + def test_nsim_drop(self): + """Drop all packets""" + packets = create_stream(self, self.pg0, self.pg1, 5) + self.pg0.add_stream(packets) + self.vapi.cli("clear trace") + # test fails if running test-debug and no delay is set ("invalid delay 0.00") + self.vapi.cli( + "set nsim delay 1 us bandwidth 1 gbit packet-size 128 drop-fraction 1.0 packets-per-drop 0" + ) + + self.pg_start() + self.pg1.assert_nothing_captured() + reply = self.vapi.cli("show nsim") + self.assertIn("drop fraction: 1.0", reply) + reply = self.vapi.cli("show trace") + self.assertIn("sw_if_index -1", reply) + + +@unittest.skipIf("nsim" in config.excluded_plugins, "Exclude NSIM plugin tests") +class TestNsimApi(VppTestCase): + """NSIM plugin tests [API]""" + + @classmethod + def setUpClass(cls): + super(TestNsimApi, cls).setUpClass() + try: + cls.create_pg_interfaces(range(2)) + for i in cls.pg_interfaces: + i.config_ip4() + i.resolve_arp() + i.admin_up() + except Exception: + cls.tearDownClass() + raise + + @classmethod + def tearDownClass(cls): + cls.vapi.nsim_cross_connect_enable_disable( + enable_disable=False, sw_if_index0=1, sw_if_index1=2 + ) + cls.vapi.nsim_output_feature_enable_disable(enable_disable=False, sw_if_index=1) + for i in cls.pg_interfaces: + i.unconfig_ip4() + i.admin_down() + super(TestNsimApi, cls).tearDownClass() + + def test_nsim_delay(self): + """Add 100ms delay""" + packets = create_stream(self, self.pg0, self.pg1, 5) + self.pg0.add_stream(packets) + self.pg0.enable_capture() + self.pg1.enable_capture() + + # "show nsim" shows 99.9ms if delay is exactly 100000 + self.vapi.nsim_configure2( + delay_in_usec=100001, + average_packet_size=128, + bandwidth_in_bits_per_second=100000000000, + packets_per_drop=0, + packets_per_reorder=0, + ) + self.vapi.nsim_cross_connect_enable_disable( + enable_disable=True, sw_if_index0=1, sw_if_index1=2 + ) + self.vapi.nsim_output_feature_enable_disable(enable_disable=True, sw_if_index=1) + self.pg_start() + capture = self.pg1.get_capture() + reply = self.vapi.cli("show trace") + verify_capture(self, self.pg0, self.pg1, capture, reply) + self.assertIn("nsim", reply) + reply = self.vapi.cli("show nsim") + self.assertIn("delay: 100.0 ms", reply) + + +# has to be separated, otherwise we get "VPP API client: read failed" +# when configuring NSIM (nsim_configure2) and then VPP crashes on teardown +@unittest.skipIf("nsim" in config.excluded_plugins, "Exclude NSIM plugin tests") +class TestNsimApi2(VppTestCase): + """NSIM plugin tests [API]""" + + @classmethod + def setUpClass(cls): + super(TestNsimApi2, cls).setUpClass() + try: + cls.create_pg_interfaces(range(2)) + for i in cls.pg_interfaces: + i.config_ip4() + i.resolve_arp() + i.admin_up() + except Exception: + cls.tearDownClass() + raise + + @classmethod + def tearDownClass(cls): + cls.vapi.nsim_cross_connect_enable_disable( + enable_disable=False, sw_if_index0=1, sw_if_index1=2 + ) + cls.vapi.nsim_output_feature_enable_disable(enable_disable=False, sw_if_index=1) + for i in cls.pg_interfaces: + i.unconfig_ip4() + i.admin_down() + super(TestNsimApi2, cls).tearDownClass() + + def test_nsim_drop(self): + """Drop all packets""" + packets = create_stream(self, self.pg0, self.pg1, 5) + self.pg0.add_stream(packets) + self.pg0.enable_capture() + self.pg1.enable_capture() + self.vapi.cli("clear trace") + + self.vapi.nsim_configure2( + delay_in_usec=10, + average_packet_size=128, + bandwidth_in_bits_per_second=100000000, + packets_per_drop=1, + packets_per_reorder=0, + ) + self.vapi.nsim_cross_connect_enable_disable( + enable_disable=True, sw_if_index0=1, sw_if_index1=2 + ) + self.vapi.nsim_output_feature_enable_disable(enable_disable=True, sw_if_index=1) + + self.pg_start() + self.pg1.assert_nothing_captured() + reply = self.vapi.cli("show nsim") + self.assertIn("drop fraction: 1.0", reply) + reply = self.vapi.cli("show trace") + self.assertIn("sw_if_index -1", reply) + + +if __name__ == "__main__": + unittest.main(testRunner=VppTestRunner)