
* TypeError: assertIsNotNone() got an unexpected keyword argument 'msg' * Correct missing calls to setUpClass/tearDownClass. If you want the setUpClass and tearDownClass on base classes called then you must call up to them yourself. The implementations in TestCase are empty. https://docs.python.org/2/library/unittest.html#setupclass-and-teardownclass Cleans up issues in parallel test mode: ------------- FAILURES AND ERRORS IN TESTS: Testcase name: VCL Thru Host Stack Bidir Nsock FAILURE: test_vcl.VCLThruHostStackBidirNsock.test_vcl_thru_host_stack_bi_dir_nsock [test_vcl.VCLThruHostStackBidirNsock.test_vcl_thru_host_stack_bi_dir_nsock] Testcase name: Bidirectional Forwarding Detection (BFD) (changing auth) ERROR: test_bfd.BFDAuthOnOffTestCase.test_auth_change_key_delayed [test_bfd.BFDAuthOnOffTestCase.test_auth_change_key_delayed] ERROR: test_bfd.BFDAuthOnOffTestCase.test_auth_change_key_immediate [test_bfd.BFDAuthOnOffTestCase.test_auth_change_key_immediate] ERROR: test_bfd.BFDAuthOnOffTestCase.test_auth_off_delayed [test_bfd.BFDAuthOnOffTestCase.test_auth_off_delayed] ERROR: test_bfd.BFDAuthOnOffTestCase.test_auth_off_immediate [test_bfd.BFDAuthOnOffTestCase.test_auth_off_immediate] ERROR: test_bfd.BFDAuthOnOffTestCase.test_auth_on_delayed [test_bfd.BFDAuthOnOffTestCase.test_auth_on_delayed] ERROR: test_bfd.BFDAuthOnOffTestCase.test_auth_on_immediate [test_bfd.BFDAuthOnOffTestCase.test_auth_on_immediate] Testcase name: Punt Socket for IPv4 ERROR: test_punt.TestIP4PuntSocket.test_punt_socket_dump [test_punt.TestIP4PuntSocket.test_punt_socket_dump] ERROR: test_punt.TestIP4PuntSocket.test_punt_socket_traffic_multi_port_multi_sockets [test_punt.TestIP4PuntSocket.test_punt_socket_traffic_multi_port_multi_sockets] ERROR: test_punt.TestIP4PuntSocket.test_punt_socket_traffic_multi_ports_single_socket [test_punt.TestIP4PuntSocket.test_punt_socket_traffic_multi_ports_single_socket] ERROR: test_punt.TestIP4PuntSocket.test_punt_socket_traffic_single_port_single_socket [test_punt.TestIP4PuntSocket.test_punt_socket_traffic_single_port_single_socket] Testcase name: Bidirectional Forwarding Detection (BFD) (IPv6) ERROR: test_bfd.BFD6TestCase.test_echo [test_bfd.BFD6TestCase.test_echo] ERROR: test_bfd.BFD6TestCase.test_echo_looped_back [test_bfd.BFD6TestCase.test_echo_looped_back] ERROR: test_bfd.BFD6TestCase.test_intf_deleted [test_bfd.BFD6TestCase.test_intf_deleted] ERROR: test_bfd.BFD6TestCase.test_session_up [test_bfd.BFD6TestCase.test_session_up] ERROR: test_bfd.BFD6TestCase.test_session_up_by_ip [test_bfd.BFD6TestCase.test_session_up_by_ip] Testcase name: Bidirectional Forwarding Detection (BFD) (CLI) ERROR: test_bfd.BFDCLITestCase.test_add_mod_del_bfd_udp [test_bfd.BFDCLITestCase.test_add_mod_del_bfd_udp] ERROR: test_bfd.BFDCLITestCase.test_add_mod_del_bfd_udp6 [test_bfd.BFDCLITestCase.test_add_mod_del_bfd_udp6] ERROR: test_bfd.BFDCLITestCase.test_add_mod_del_bfd_udp6_auth [test_bfd.BFDCLITestCase.test_add_mod_del_bfd_udp6_auth] ERROR: test_bfd.BFDCLITestCase.test_add_mod_del_bfd_udp_auth [test_bfd.BFDCLITestCase.test_add_mod_del_bfd_udp_auth] ERROR: test_bfd.BFDCLITestCase.test_admin_up_down [test_bfd.BFDCLITestCase.test_admin_up_down] ERROR: test_bfd.BFDCLITestCase.test_auth_on_off [test_bfd.BFDCLITestCase.test_auth_on_off] ERROR: test_bfd.BFDCLITestCase.test_auth_on_off_delayed [test_bfd.BFDCLITestCase.test_auth_on_off_delayed] ERROR: test_bfd.BFDCLITestCase.test_set_del_meticulous_sha1_key [test_bfd.BFDCLITestCase.test_set_del_meticulous_sha1_key] ERROR: test_bfd.BFDCLITestCase.test_set_del_sha1_key [test_bfd.BFDCLITestCase.test_set_del_sha1_key] ERROR: test_bfd.BFDCLITestCase.test_set_del_udp_echo_source [test_bfd.BFDCLITestCase.test_set_del_udp_echo_source] ERROR: test_bfd.BFDCLITestCase.test_show [test_bfd.BFDCLITestCase.test_show] Testcase name: VAPI test ERROR: test_vapi.VAPITestCase.test_vapi_c [test_vapi.VAPITestCase.test_vapi_c] Testcase name: Container integration extended testcases ERROR: test_container.ContainerIntegrationTestCase.test_0010_basic_conn_test [test_container.ContainerIntegrationTestCase.test_0010_basic_conn_test] ERROR: test_container.ContainerIntegrationTestCase.test_0011_basic_conn_test [test_container.ContainerIntegrationTestCase.test_0011_basic_conn_test] ERROR: test_container.ContainerIntegrationTestCase.test_0050_loopback_prepare_test [test_container.ContainerIntegrationTestCase.test_0050_loopback_prepare_test] ERROR: test_container.ContainerIntegrationTestCase.test_0110_basic_conn_test [test_container.ContainerIntegrationTestCase.test_0110_basic_conn_test] ERROR: test_container.ContainerIntegrationTestCase.test_0111_basic_conn_test [test_container.ContainerIntegrationTestCase.test_0111_basic_conn_test] ERROR: test_container.ContainerIntegrationTestCase.test_0200_basic_conn_test [test_container.ContainerIntegrationTestCase.test_0200_basic_conn_test] ERROR: test_container.ContainerIntegrationTestCase.test_0210_basic_conn_test [test_container.ContainerIntegrationTestCase.test_0210_basic_conn_test] ERROR: test_container.ContainerIntegrationTestCase.test_0211_basic_conn_test [test_container.ContainerIntegrationTestCase.test_0211_basic_conn_test] ERROR: test_container.ContainerIntegrationTestCase.test_0300_unconfigure_commands [test_container.ContainerIntegrationTestCase.test_0300_unconfigure_commands] ERROR: test_container.ContainerIntegrationTestCase.test_0410_spoof_test [test_container.ContainerIntegrationTestCase.test_0410_spoof_test] ERROR: test_container.ContainerIntegrationTestCase.test_0411_spoof_test [test_container.ContainerIntegrationTestCase.test_0411_spoof_test] Testcase name: Re-enable IPFIX ERROR: test_flowprobe.ReenableIPFIX.test_0011 [test_flowprobe.ReenableIPFIX.test_0011] Testcase name: VXLAN over IPv6 Test Case ERROR: setUpClass [setUpClass (test_vxlan6.TestVxlan6)] Testcase name: JVPP Core Test Case ERROR: test_jvpp.TestJVpp.test_vpp_acl_callback_api [test_jvpp.TestJVpp.test_vpp_acl_callback_api] ERROR: test_jvpp.TestJVpp.test_vpp_acl_future_api [test_jvpp.TestJVpp.test_vpp_acl_future_api] ERROR: test_jvpp.TestJVpp.test_vpp_core_callback_api [test_jvpp.TestJVpp.test_vpp_core_callback_api] ERROR: test_jvpp.TestJVpp.test_vpp_core_future_api [test_jvpp.TestJVpp.test_vpp_core_future_api] ERROR: test_jvpp.TestJVpp.test_vpp_ioamexport_callback_api [test_jvpp.TestJVpp.test_vpp_ioamexport_callback_api] ERROR: test_jvpp.TestJVpp.test_vpp_ioamexport_future_api [test_jvpp.TestJVpp.test_vpp_ioamexport_future_api] ERROR: test_jvpp.TestJVpp.test_vpp_ioampot_callback_api [test_jvpp.TestJVpp.test_vpp_ioampot_callback_api] ERROR: test_jvpp.TestJVpp.test_vpp_ioampot_future_api [test_jvpp.TestJVpp.test_vpp_ioampot_future_api] ERROR: test_jvpp.TestJVpp.test_vpp_ioamtrace_callback_api [test_jvpp.TestJVpp.test_vpp_ioamtrace_callback_api] ERROR: test_jvpp.TestJVpp.test_vpp_ioamtrace_future_api [test_jvpp.TestJVpp.test_vpp_ioamtrace_future_api] ERROR: test_jvpp.TestJVpp.test_vpp_snat_callback_api [test_jvpp.TestJVpp.test_vpp_snat_callback_api] ERROR: test_jvpp.TestJVpp.test_vpp_snat_future_api [test_jvpp.TestJVpp.test_vpp_snat_future_api] Testcase name: LDP Cut Thru Tests FAILURE: test_vcl.LDPCutThruTestCase.test_ldp_cut_thru_iperf3 [test_vcl.LDPCutThruTestCase.test_ldp_cut_thru_iperf3] Testcase name: ACL plugin connection-oriented extended testcases ERROR: test_acl_plugin_conns.ACLPluginConnTestCase.test_0000_conn_prepare_test [test_acl_plugin_conns.ACLPluginConnTestCase.test_0000_conn_prepare_test] ERROR: test_acl_plugin_conns.ACLPluginConnTestCase.test_0001_basic_conn_test [test_acl_plugin_conns.ACLPluginConnTestCase.test_0001_basic_conn_test] ERROR: test_acl_plugin_conns.ACLPluginConnTestCase.test_0002_basic_conn_test [test_acl_plugin_conns.ACLPluginConnTestCase.test_0002_basic_conn_test] ERROR: test_acl_plugin_conns.ACLPluginConnTestCase.test_0005_clear_conn_test [test_acl_plugin_conns.ACLPluginConnTestCase.test_0005_clear_conn_test] ERROR: test_acl_plugin_conns.ACLPluginConnTestCase.test_0006_clear_conn_test [test_acl_plugin_conns.ACLPluginConnTestCase.test_0006_clear_conn_test] ERROR: test_acl_plugin_conns.ACLPluginConnTestCase.test_0011_active_conn_test [test_acl_plugin_conns.ACLPluginConnTestCase.test_0011_active_conn_test] ERROR: test_acl_plugin_conns.ACLPluginConnTestCase.test_0012_active_conn_test [test_acl_plugin_conns.ACLPluginConnTestCase.test_0012_active_conn_test] ERROR: test_acl_plugin_conns.ACLPluginConnTestCase.test_1001_basic_conn_test [test_acl_plugin_conns.ACLPluginConnTestCase.test_1001_basic_conn_test] ERROR: test_acl_plugin_conns.ACLPluginConnTestCase.test_1002_basic_conn_test [test_acl_plugin_conns.ACLPluginConnTestCase.test_1002_basic_conn_test] ERROR: test_acl_plugin_conns.ACLPluginConnTestCase.test_1005_clear_conn_test [test_acl_plugin_conns.ACLPluginConnTestCase.test_1005_clear_conn_test] ERROR: test_acl_plugin_conns.ACLPluginConnTestCase.test_1006_clear_conn_test [test_acl_plugin_conns.ACLPluginConnTestCase.test_1006_clear_conn_test] ERROR: test_acl_plugin_conns.ACLPluginConnTestCase.test_1011_active_conn_test [test_acl_plugin_conns.ACLPluginConnTestCase.test_1011_active_conn_test] ERROR: test_acl_plugin_conns.ACLPluginConnTestCase.test_1012_active_conn_test [test_acl_plugin_conns.ACLPluginConnTestCase.test_1012_active_conn_test] ERROR: test_acl_plugin_conns.ACLPluginConnTestCase.test_2000_prepare_for_tcp_test [test_acl_plugin_conns.ACLPluginConnTestCase.test_2000_prepare_for_tcp_test] ERROR: test_acl_plugin_conns.ACLPluginConnTestCase.test_2001_tcp_transient_conn_test [test_acl_plugin_conns.ACLPluginConnTestCase.test_2001_tcp_transient_conn_test] ERROR: test_acl_plugin_conns.ACLPluginConnTestCase.test_2002_tcp_transient_conn_test [test_acl_plugin_conns.ACLPluginConnTestCase.test_2002_tcp_transient_conn_test] ERROR: test_acl_plugin_conns.ACLPluginConnTestCase.test_2003_tcp_transient_conn_test [test_acl_plugin_conns.ACLPluginConnTestCase.test_2003_tcp_transient_conn_test] ERROR: test_acl_plugin_conns.ACLPluginConnTestCase.test_2004_tcp_transient_conn_test [test_acl_plugin_conns.ACLPluginConnTestCase.test_2004_tcp_transient_conn_test] ERROR: test_acl_plugin_conns.ACLPluginConnTestCase.test_2005_tcp_transient_teardown_conn_test [test_acl_plugin_conns.ACLPluginConnTestCase.test_2005_tcp_transient_teardown_conn_test] ERROR: test_acl_plugin_conns.ACLPluginConnTestCase.test_2006_tcp_transient_teardown_conn_test [test_acl_plugin_conns.ACLPluginConnTestCase.test_2006_tcp_transient_teardown_conn_test] ERROR: test_acl_plugin_conns.ACLPluginConnTestCase.test_3001_tcp_transient_conn_test [test_acl_plugin_conns.ACLPluginConnTestCase.test_3001_tcp_transient_conn_test] ERROR: test_acl_plugin_conns.ACLPluginConnTestCase.test_3002_tcp_transient_conn_test [test_acl_plugin_conns.ACLPluginConnTestCase.test_3002_tcp_transient_conn_test] ERROR: test_acl_plugin_conns.ACLPluginConnTestCase.test_3003_tcp_transient_conn_test [test_acl_plugin_conns.ACLPluginConnTestCase.test_3003_tcp_transient_conn_test] ERROR: test_acl_plugin_conns.ACLPluginConnTestCase.test_3004_tcp_transient_conn_test [test_acl_plugin_conns.ACLPluginConnTestCase.test_3004_tcp_transient_conn_test] ERROR: test_acl_plugin_conns.ACLPluginConnTestCase.test_3005_tcp_transient_teardown_conn_test [test_acl_plugin_conns.ACLPluginConnTestCase.test_3005_tcp_transient_teardown_conn_test] ERROR: test_acl_plugin_conns.ACLPluginConnTestCase.test_3006_tcp_transient_teardown_conn_test [test_acl_plugin_conns.ACLPluginConnTestCase.test_3006_tcp_transient_teardown_conn_test] Testcase name: LDP Thru Host Stack Echo FAILURE: test_vcl.LDPThruHostStackEcho.test_ldp_thru_host_stack_echo [test_vcl.LDPThruHostStackEcho.test_ldp_thru_host_stack_echo] Testcase name: Bidirectional Forwarding Detection (BFD) - API ERROR: test_bfd.BFDAPITestCase.test_activate_auth [test_bfd.BFDAPITestCase.test_activate_auth] ERROR: test_bfd.BFDAPITestCase.test_add_auth_nonexistent_key [test_bfd.BFDAPITestCase.test_add_auth_nonexistent_key] ERROR: test_bfd.BFDAPITestCase.test_add_bfd [test_bfd.BFDAPITestCase.test_add_bfd] ERROR: test_bfd.BFDAPITestCase.test_add_bfd6 [test_bfd.BFDAPITestCase.test_add_bfd6] ERROR: test_bfd.BFDAPITestCase.test_add_bfd_sha1 [test_bfd.BFDAPITestCase.test_add_bfd_sha1] ERROR: test_bfd.BFDAPITestCase.test_add_sha1_keys [test_bfd.BFDAPITestCase.test_add_sha1_keys] ERROR: test_bfd.BFDAPITestCase.test_change_key [test_bfd.BFDAPITestCase.test_change_key] ERROR: test_bfd.BFDAPITestCase.test_deactivate_auth [test_bfd.BFDAPITestCase.test_deactivate_auth] ERROR: test_bfd.BFDAPITestCase.test_double_add [test_bfd.BFDAPITestCase.test_double_add] ERROR: test_bfd.BFDAPITestCase.test_double_add_sha1 [test_bfd.BFDAPITestCase.test_double_add_sha1] ERROR: test_bfd.BFDAPITestCase.test_mod_bfd [test_bfd.BFDAPITestCase.test_mod_bfd] ERROR: test_bfd.BFDAPITestCase.test_set_del_udp_echo_source [test_bfd.BFDAPITestCase.test_set_del_udp_echo_source] ERROR: test_bfd.BFDAPITestCase.test_shared_sha1_key [test_bfd.BFDAPITestCase.test_shared_sha1_key] Testcase name: LDP Thru Host Stack Iperf FAILURE: test_vcl.LDPThruHostStackIperf.test_ldp_thru_host_stack_iperf3 [test_vcl.LDPThruHostStackIperf.test_ldp_thru_host_stack_iperf3] Testcase name: Bidirectional Forwarding Detection (BFD) (SHA1 auth) ERROR: test_bfd.BFDSHA1TestCase.test_session_up [test_bfd.BFDSHA1TestCase.test_session_up] Testcase name: Punt Socket for IPv6 ERROR: test_punt.TestIP6PuntSocket.test_punt_socket_dump [test_punt.TestIP6PuntSocket.test_punt_socket_dump] ERROR: test_punt.TestIP6PuntSocket.test_punt_socket_traffic_multi_port_multi_sockets [test_punt.TestIP6PuntSocket.test_punt_socket_traffic_multi_port_multi_sockets] ERROR: test_punt.TestIP6PuntSocket.test_punt_socket_traffic_multi_ports_single_socket [test_punt.TestIP6PuntSocket.test_punt_socket_traffic_multi_ports_single_socket] ERROR: test_punt.TestIP6PuntSocket.test_punt_socket_traffic_single_port_single_socket [test_punt.TestIP6PuntSocket.test_punt_socket_traffic_single_port_single_socket] Testcase name: Disable Flowprobe feature ERROR: test_flowprobe.DisableFP.test_0001 [test_flowprobe.DisableFP.test_0001] Testcase name: Disable IPFIX ERROR: test_flowprobe.DisableIPFIX.test_0001 [test_flowprobe.DisableIPFIX.test_0001] Testcase name: VPP Object Model Test ERROR: test_vom.VOMTestCase.test_vom_cpp [test_vom.VOMTestCase.test_vom_cpp] Testcase name: BFD-FIB interactions (IPv6) ERROR: test_bfd.BFDFIBTestCase.test_session_with_fib [test_bfd.BFDFIBTestCase.test_session_with_fib] Testcase name: Bidirectional Forwarding Detection (BFD) ERROR: test_bfd.BFD4TestCase.test_echo [test_bfd.BFD4TestCase.test_echo] ERROR: test_bfd.BFD4TestCase.test_echo_looped_back [test_bfd.BFD4TestCase.test_echo_looped_back] ERROR: test_bfd.BFD4TestCase.test_intf_deleted [test_bfd.BFD4TestCase.test_intf_deleted] ERROR: test_bfd.BFD4TestCase.test_session_down [test_bfd.BFD4TestCase.test_session_down] ERROR: test_bfd.BFD4TestCase.test_session_up [test_bfd.BFD4TestCase.test_session_up] ERROR: test_bfd.BFD4TestCase.test_session_up_by_ip [test_bfd.BFD4TestCase.test_session_up_by_ip] Testcase name: VXLAN-GPE Test Case ERROR: test_vxlan_gpe.TestVxlanGpe.test_decap [test_vxlan_gpe.TestVxlanGpe.test_decap] ERROR: test_vxlan_gpe.TestVxlanGpe.test_encap [test_vxlan_gpe.TestVxlanGpe.test_encap] ERROR: test_vxlan_gpe.TestVxlanGpe.test_ucast_flood [test_vxlan_gpe.TestVxlanGpe.test_ucast_flood] Testcase name: Re-enable Flowprobe feature ERROR: test_flowprobe.ReenableFP.test_0001 [test_flowprobe.ReenableFP.test_0001] Testcase name: LDP IPv6 Cut Thru Tests FAILURE: test_vcl.LDPIpv6CutThruTestCase.test_ldp_ipv6_cut_thru_echo [test_vcl.LDPIpv6CutThruTestCase.test_ldp_ipv6_cut_thru_echo] TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED: VXLAN over IPv6 Test Case ============================================================================== 0 attempt(s) left. Change-Id: Id202ed6ee7f57670f34ec87380244c568b509416 Signed-off-by: Paul Vinciguerra <pvinci@vinciconsulting.com>
652 lines
21 KiB
Python
652 lines
21 KiB
Python
#!/usr/bin/env python
|
|
import binascii
|
|
import random
|
|
import socket
|
|
import unittest
|
|
import os
|
|
import scapy.layers.inet6 as inet6
|
|
import threading
|
|
import struct
|
|
|
|
from struct import unpack, unpack_from
|
|
from util import ppp, ppc
|
|
from re import compile
|
|
from scapy.packet import Raw
|
|
from scapy.layers.l2 import Ether
|
|
from scapy.layers.inet import IP, UDP, ICMP
|
|
from scapy.layers.inet6 import IPv6, ICMPv6DestUnreach
|
|
from framework import VppTestCase, VppTestRunner
|
|
|
|
|
|
# Format MAC Address
|
|
def get_mac_addr(bytes_addr):
|
|
return ':'.join('%02x' % ord(b) for b in bytes_addr)
|
|
|
|
|
|
# Format IP Address
|
|
def ipv4(bytes_addr):
|
|
return '.'.join('%d' % ord(b) for b in bytes_addr)
|
|
|
|
|
|
# Unpack Ethernet Frame
|
|
def ethernet_frame(data):
|
|
dest_mac, src_mac, proto = struct.unpack('! 6s 6s H', data[:14])
|
|
return dest_mac, src_mac, socket.htons(proto), data[14:]
|
|
|
|
|
|
# Unpack IPv4 Packets
|
|
def ipv4_packet(data):
|
|
proto, src, target = struct.unpack('! 8x 1x B 2x 4s 4s', data[:20])
|
|
return proto, src, target, data[20:]
|
|
|
|
|
|
# Unpack IPv6 Packets
|
|
def ipv6_packet(data):
|
|
nh, src, target = struct.unpack('! 6x B 1x 16s 16s', data[:40])
|
|
return nh, src, target, data[40:]
|
|
|
|
|
|
# Unpacks any UDP Packet
|
|
def udp_seg(data):
|
|
src_port, dest_port, size = struct.unpack('! H H 2x H', data[:8])
|
|
return src_port, dest_port, size, data[8:]
|
|
|
|
|
|
# Unpacks any TCP Packet
|
|
def tcp_seg(data):
|
|
src_port, dest_port, seq, flag = struct.unpack('! H H L 4x H', data[:14])
|
|
return src_port, dest_port, seq, data[((flag >> 12) * 4):]
|
|
|
|
|
|
def receivePackets(sock, counters):
|
|
# Wait for some packets on socket
|
|
while True:
|
|
data = sock.recv(65536)
|
|
|
|
# punt socket metadata
|
|
# packet_desc = data[0:8]
|
|
|
|
# Ethernet
|
|
_, _, eth_proto, data = ethernet_frame(data[8:])
|
|
# Ipv4
|
|
if eth_proto == 8:
|
|
proto, _, _, data = ipv4_packet(data)
|
|
# TCP
|
|
if proto == 6:
|
|
_, dst_port, _, data = udp_seg(data)
|
|
# UDP
|
|
elif proto == 17:
|
|
_, dst_port, _, data = udp_seg(data)
|
|
counters[dst_port] = 0
|
|
# Ipv6
|
|
elif eth_proto == 0xdd86:
|
|
nh, _, _, data = ipv6_packet(data)
|
|
# TCP
|
|
if nh == 6:
|
|
_, dst_port, _, data = udp_seg(data)
|
|
# UDP
|
|
elif nh == 17:
|
|
_, dst_port, _, data = udp_seg(data)
|
|
counters[dst_port] = 0
|
|
|
|
|
|
class serverSocketThread(threading.Thread):
|
|
""" Socket server thread"""
|
|
|
|
def __init__(self, threadID, sockName, counters):
|
|
threading.Thread.__init__(self)
|
|
self.threadID = threadID
|
|
self.sockName = sockName
|
|
self.sock = None
|
|
self.counters = counters
|
|
|
|
def run(self):
|
|
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
|
try:
|
|
os.unlink(self.sockName)
|
|
except:
|
|
pass
|
|
self.sock.bind(self.sockName)
|
|
|
|
receivePackets(self.sock, self.counters)
|
|
|
|
|
|
class TestPuntSocket(VppTestCase):
|
|
""" Punt Socket """
|
|
|
|
ports = [1111, 2222, 3333, 4444]
|
|
sock_servers = list()
|
|
portsCheck = dict()
|
|
nr_packets = 256
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(TestPuntSocket, cls).setUpClass()
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
super(TestPuntSocket, cls).tearDownClass()
|
|
|
|
@classmethod
|
|
def setUpConstants(cls):
|
|
cls.extra_vpp_punt_config = [
|
|
"punt", "{", "socket", cls.tempdir+"/socket_punt", "}"]
|
|
super(TestPuntSocket, cls).setUpConstants()
|
|
|
|
def setUp(self):
|
|
super(TestPuntSocket, self).setUp()
|
|
random.seed()
|
|
|
|
self.create_pg_interfaces(range(2))
|
|
for i in self.pg_interfaces:
|
|
i.admin_up()
|
|
|
|
def tearDown(self):
|
|
del self.sock_servers[:]
|
|
super(TestPuntSocket, self).tearDown()
|
|
|
|
def socket_client_create(self, sock_name, id=None):
|
|
thread = serverSocketThread(id, sock_name, self.portsCheck)
|
|
self.sock_servers.append(thread)
|
|
thread.start()
|
|
|
|
def socket_client_close(self):
|
|
for thread in self.sock_servers:
|
|
thread.sock.close()
|
|
|
|
|
|
class TestIP4PuntSocket(TestPuntSocket):
|
|
""" Punt Socket for IPv4 """
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(TestIP4PuntSocket, cls).setUpClass()
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
super(TestIP4PuntSocket, cls).tearDownClass()
|
|
|
|
def setUp(self):
|
|
super(TestIP4PuntSocket, self).setUp()
|
|
|
|
for i in self.pg_interfaces:
|
|
i.config_ip4()
|
|
i.resolve_arp()
|
|
|
|
def tearDown(self):
|
|
super(TestIP4PuntSocket, self).tearDown()
|
|
for i in self.pg_interfaces:
|
|
i.unconfig_ip4()
|
|
i.admin_down()
|
|
|
|
def test_punt_socket_dump(self):
|
|
""" Punt socket registration/deregistration"""
|
|
|
|
punts = self.vapi.punt_socket_dump(is_ip6=0)
|
|
self.assertEqual(len(punts), 0)
|
|
|
|
#
|
|
# configure a punt socket
|
|
#
|
|
self.vapi.punt_socket_register(1111, self.tempdir+"/socket_punt_1111")
|
|
self.vapi.punt_socket_register(2222, self.tempdir+"/socket_punt_2222")
|
|
punts = self.vapi.punt_socket_dump(is_ip6=0)
|
|
self.assertEqual(len(punts), 2)
|
|
self.assertEqual(punts[0].punt.l4_port, 1111)
|
|
self.assertEqual(punts[1].punt.l4_port, 2222)
|
|
|
|
#
|
|
# deregister a punt socket
|
|
#
|
|
self.vapi.punt_socket_deregister(1111)
|
|
punts = self.vapi.punt_socket_dump(is_ip6=0)
|
|
self.assertEqual(len(punts), 1)
|
|
|
|
#
|
|
# configure a punt socket again
|
|
#
|
|
self.vapi.punt_socket_register(1111, self.tempdir+"/socket_punt_1111")
|
|
self.vapi.punt_socket_register(3333, self.tempdir+"/socket_punt_3333")
|
|
punts = self.vapi.punt_socket_dump(is_ip6=0)
|
|
self.assertEqual(len(punts), 3)
|
|
|
|
#
|
|
# deregister all punt socket
|
|
#
|
|
self.vapi.punt_socket_deregister(1111)
|
|
self.vapi.punt_socket_deregister(2222)
|
|
self.vapi.punt_socket_deregister(3333)
|
|
punts = self.vapi.punt_socket_dump(is_ip6=0)
|
|
self.assertEqual(len(punts), 0)
|
|
|
|
def test_punt_socket_traffic_single_port_single_socket(self):
|
|
""" Punt socket traffic single port single socket"""
|
|
|
|
port = self.ports[0]
|
|
|
|
p = (Ether(src=self.pg0.remote_mac,
|
|
dst=self.pg0.local_mac) /
|
|
IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
|
|
UDP(sport=9876, dport=port) /
|
|
Raw('\xa5' * 100))
|
|
|
|
pkts = p * self.nr_packets
|
|
self.portsCheck[port] = self.nr_packets
|
|
|
|
punts = self.vapi.punt_socket_dump(is_ip6=0)
|
|
self.assertEqual(len(punts), 0)
|
|
|
|
#
|
|
# expect ICMP - port unreachable for all packets
|
|
#
|
|
self.vapi.cli("clear trace")
|
|
self.pg0.add_stream(pkts)
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
# FIXME - when punt socket deregister is implemented
|
|
# rx = self.pg0.get_capture(self.nr_packets)
|
|
# for p in rx:
|
|
# self.assertEqual(int(p[IP].proto), 1) # ICMP
|
|
# self.assertEqual(int(p[ICMP].code), 3) # unreachable
|
|
|
|
#
|
|
# configure a punt socket
|
|
#
|
|
self.socket_client_create(self.tempdir+"/socket_" + str(port))
|
|
self.vapi.punt_socket_register(port, self.tempdir+"/socket_" +
|
|
str(port))
|
|
punts = self.vapi.punt_socket_dump(is_ip6=0)
|
|
self.assertEqual(len(punts), 1)
|
|
|
|
self.logger.debug("Sending %s packets to port %d",
|
|
str(self.portsCheck[port]), port)
|
|
#
|
|
# expect punt socket and no packets on pg0
|
|
#
|
|
self.vapi.cli("clear errors")
|
|
self.vapi.cli("clear trace")
|
|
self.pg0.add_stream(pkts)
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
self.pg0.get_capture(0)
|
|
self.logger.info(self.vapi.cli("show trace"))
|
|
self.socket_client_close()
|
|
self.assertEqual(self.portsCheck[port], 0)
|
|
|
|
#
|
|
# remove punt socket. expect ICMP - port unreachable for all packets
|
|
#
|
|
self.vapi.punt_socket_deregister(port)
|
|
punts = self.vapi.punt_socket_dump(is_ip6=0)
|
|
self.assertEqual(len(punts), 0)
|
|
self.pg0.add_stream(pkts)
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
# FIXME - when punt socket deregister is implemented
|
|
# self.pg0.get_capture(nr_packets)
|
|
|
|
def test_punt_socket_traffic_multi_port_multi_sockets(self):
|
|
""" Punt socket traffic multi ports and multi sockets"""
|
|
|
|
for p in self.ports:
|
|
self.portsCheck[p] = 0
|
|
|
|
#
|
|
# create stream with random pakets count per given ports
|
|
#
|
|
pkts = list()
|
|
for _ in range(0, self.nr_packets):
|
|
# choose port from port list
|
|
p = random.choice(self.ports)
|
|
pkts.append((
|
|
Ether(src=self.pg0.remote_mac,
|
|
dst=self.pg0.local_mac) /
|
|
IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
|
|
UDP(sport=9876, dport=p) /
|
|
Raw('\xa5' * 100)))
|
|
self.portsCheck[p] += 1
|
|
#
|
|
# no punt socket
|
|
#
|
|
punts = self.vapi.punt_socket_dump(is_ip6=0)
|
|
self.assertEqual(len(punts), 0)
|
|
|
|
#
|
|
# configure a punt socket
|
|
#
|
|
for p in self.ports:
|
|
self.socket_client_create(self.tempdir+"/socket_" + str(p))
|
|
self.vapi.punt_socket_register(p, self.tempdir+"/socket_" + str(p))
|
|
punts = self.vapi.punt_socket_dump(is_ip6=0)
|
|
self.assertEqual(len(punts), len(self.ports))
|
|
|
|
for p in self.ports:
|
|
self.logger.debug("Sending %s packets to port %d",
|
|
str(self.portsCheck[p]), p)
|
|
|
|
#
|
|
# expect punt socket and no packets on pg0
|
|
#
|
|
self.vapi.cli("clear errors")
|
|
self.vapi.cli("clear trace")
|
|
self.pg0.add_stream(pkts)
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
self.pg0.get_capture(0)
|
|
self.logger.info(self.vapi.cli("show trace"))
|
|
self.socket_client_close()
|
|
|
|
for p in self.ports:
|
|
self.assertEqual(self.portsCheck[p], 0)
|
|
self.vapi.punt_socket_deregister(p)
|
|
punts = self.vapi.punt_socket_dump(is_ip6=0)
|
|
self.assertEqual(len(punts), 0)
|
|
|
|
def test_punt_socket_traffic_multi_ports_single_socket(self):
|
|
""" Punt socket traffic multi ports and single socket"""
|
|
|
|
for p in self.ports:
|
|
self.portsCheck[p] = 0
|
|
|
|
#
|
|
# create stream with random pakets count per given ports
|
|
#
|
|
pkts = list()
|
|
for _ in range(0, self.nr_packets):
|
|
# choose port from port list
|
|
p = random.choice(self.ports)
|
|
pkts.append((
|
|
Ether(src=self.pg0.remote_mac,
|
|
dst=self.pg0.local_mac) /
|
|
IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
|
|
UDP(sport=9876, dport=p) /
|
|
Raw('\xa5' * 100)))
|
|
self.portsCheck[p] += 1
|
|
|
|
#
|
|
# no punt socket
|
|
#
|
|
punts = self.vapi.punt_socket_dump(is_ip6=0)
|
|
self.assertEqual(len(punts), 0)
|
|
|
|
# configure a punt socket
|
|
#
|
|
self.socket_client_create(self.tempdir+"/socket_multi")
|
|
for p in self.ports:
|
|
self.vapi.punt_socket_register(p, self.tempdir+"/socket_multi")
|
|
punts = self.vapi.punt_socket_dump(is_ip6=0)
|
|
self.assertEqual(len(punts), len(self.ports))
|
|
|
|
for p in self.ports:
|
|
self.logger.debug("Sending %s packets to port %d",
|
|
str(self.portsCheck[p]), p)
|
|
#
|
|
# expect punt socket and no packets on pg0
|
|
#
|
|
self.vapi.cli("clear errors")
|
|
self.vapi.cli("clear trace")
|
|
self.pg0.add_stream(pkts)
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
self.pg0.get_capture(0)
|
|
self.logger.info(self.vapi.cli("show trace"))
|
|
self.socket_client_close()
|
|
|
|
for p in self.ports:
|
|
self.assertEqual(self.portsCheck[p], 0)
|
|
self.vapi.punt_socket_deregister(p)
|
|
punts = self.vapi.punt_socket_dump(is_ip6=0)
|
|
self.assertEqual(len(punts), 0)
|
|
|
|
|
|
class TestIP6PuntSocket(TestPuntSocket):
|
|
""" Punt Socket for IPv6"""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(TestIP6PuntSocket, cls).setUpClass()
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
super(TestIP6PuntSocket, cls).tearDownClass()
|
|
|
|
def setUp(self):
|
|
super(TestIP6PuntSocket, self).setUp()
|
|
|
|
for i in self.pg_interfaces:
|
|
i.config_ip6()
|
|
i.resolve_ndp()
|
|
|
|
def tearDown(self):
|
|
super(TestIP6PuntSocket, self).tearDown()
|
|
for i in self.pg_interfaces:
|
|
i.unconfig_ip6()
|
|
i.admin_down()
|
|
|
|
def test_punt_socket_dump(self):
|
|
""" Punt socket registration """
|
|
|
|
punts = self.vapi.punt_socket_dump(is_ip6=1)
|
|
self.assertEqual(len(punts), 0)
|
|
|
|
#
|
|
# configure a punt socket
|
|
#
|
|
self.vapi.punt_socket_register(1111, self.tempdir+"/socket_1111",
|
|
is_ip4=0)
|
|
self.vapi.punt_socket_register(2222, self.tempdir+"/socket_2222",
|
|
is_ip4=0)
|
|
punts = self.vapi.punt_socket_dump(is_ip6=1)
|
|
self.assertEqual(len(punts), 2)
|
|
self.assertEqual(punts[0].punt.l4_port, 1111)
|
|
self.assertEqual(punts[1].punt.l4_port, 2222)
|
|
|
|
#
|
|
# deregister a punt socket
|
|
#
|
|
self.vapi.punt_socket_deregister(1111, is_ip4=0)
|
|
punts = self.vapi.punt_socket_dump(is_ip6=1)
|
|
self.assertEqual(len(punts), 1)
|
|
|
|
#
|
|
# configure a punt socket again
|
|
#
|
|
self.vapi.punt_socket_register(1111, self.tempdir+"/socket_1111",
|
|
is_ip4=0)
|
|
punts = self.vapi.punt_socket_dump(is_ip6=1)
|
|
self.assertEqual(len(punts), 2)
|
|
|
|
#
|
|
# deregister all punt socket
|
|
#
|
|
self.vapi.punt_socket_deregister(1111, is_ip4=0)
|
|
self.vapi.punt_socket_deregister(2222, is_ip4=0)
|
|
self.vapi.punt_socket_deregister(3333, is_ip4=0)
|
|
punts = self.vapi.punt_socket_dump(is_ip6=1)
|
|
self.assertEqual(len(punts), 0)
|
|
|
|
def test_punt_socket_traffic_single_port_single_socket(self):
|
|
""" Punt socket traffic single port single socket"""
|
|
|
|
port = self.ports[0]
|
|
|
|
p = (Ether(src=self.pg0.remote_mac,
|
|
dst=self.pg0.local_mac) /
|
|
IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
|
|
inet6.UDP(sport=9876, dport=port) /
|
|
Raw('\xa5' * 100))
|
|
|
|
pkts = p * self.nr_packets
|
|
self.portsCheck[port] = self.nr_packets
|
|
|
|
punts = self.vapi.punt_socket_dump(is_ip6=1)
|
|
self.assertEqual(len(punts), 0)
|
|
|
|
#
|
|
# expect ICMPv6 - destination unreachable for all packets
|
|
#
|
|
self.vapi.cli("clear trace")
|
|
self.pg0.add_stream(pkts)
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
# FIXME - when punt socket deregister is implemented
|
|
# rx = self.pg0.get_capture(self.nr_packets)
|
|
# for p in rx:
|
|
# self.assertEqual(int(p[IPv6].nh), 58) # ICMPv6
|
|
# self.assertEqual(int(p[ICMPv6DestUnreach].code),4) # unreachable
|
|
|
|
#
|
|
# configure a punt socket
|
|
#
|
|
self.socket_client_create(self.tempdir+"/socket_" + str(port))
|
|
self.vapi.punt_socket_register(port, self.tempdir+"/socket_" +
|
|
str(port), is_ip4=0)
|
|
punts = self.vapi.punt_socket_dump(is_ip6=1)
|
|
self.assertEqual(len(punts), 1)
|
|
|
|
self.logger.debug("Sending %s packets to port %d",
|
|
str(self.portsCheck[port]), port)
|
|
#
|
|
# expect punt socket and no packets on pg0
|
|
#
|
|
self.vapi.cli("clear errors")
|
|
self.vapi.cli("clear trace")
|
|
self.pg0.add_stream(pkts)
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
self.pg0.get_capture(0)
|
|
self.logger.info(self.vapi.cli("show trace"))
|
|
self.socket_client_close()
|
|
self.assertEqual(self.portsCheck[port], 0)
|
|
|
|
#
|
|
# remove punt socket. expect ICMP - dest. unreachable for all packets
|
|
#
|
|
self.vapi.punt_socket_deregister(port, is_ip4=0)
|
|
punts = self.vapi.punt_socket_dump(is_ip6=1)
|
|
self.assertEqual(len(punts), 0)
|
|
self.pg0.add_stream(pkts)
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
# FIXME - when punt socket deregister is implemented
|
|
# self.pg0.get_capture(nr_packets)
|
|
|
|
def test_punt_socket_traffic_multi_port_multi_sockets(self):
|
|
""" Punt socket traffic multi ports and multi sockets"""
|
|
|
|
for p in self.ports:
|
|
self.portsCheck[p] = 0
|
|
|
|
#
|
|
# create stream with random pakets count per given ports
|
|
#
|
|
pkts = list()
|
|
for _ in range(0, self.nr_packets):
|
|
# choose port from port list
|
|
p = random.choice(self.ports)
|
|
pkts.append((
|
|
Ether(src=self.pg0.remote_mac,
|
|
dst=self.pg0.local_mac) /
|
|
IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
|
|
inet6.UDP(sport=9876, dport=p) /
|
|
Raw('\xa5' * 100)))
|
|
self.portsCheck[p] += 1
|
|
#
|
|
# no punt socket
|
|
#
|
|
punts = self.vapi.punt_socket_dump(is_ip6=1)
|
|
self.assertEqual(len(punts), 0)
|
|
|
|
#
|
|
# configure a punt socket
|
|
#
|
|
for p in self.ports:
|
|
self.socket_client_create(self.tempdir+"/socket_" + str(p))
|
|
self.vapi.punt_socket_register(p, self.tempdir+"/socket_" + str(p),
|
|
is_ip4=0)
|
|
punts = self.vapi.punt_socket_dump(is_ip6=1)
|
|
self.assertEqual(len(punts), len(self.ports))
|
|
|
|
for p in self.ports:
|
|
self.logger.debug("Sending %s packets to port %d",
|
|
str(self.portsCheck[p]), p)
|
|
|
|
#
|
|
# expect punt socket and no packets on pg0
|
|
#
|
|
self.vapi.cli("clear errors")
|
|
self.vapi.cli("clear trace")
|
|
self.pg0.add_stream(pkts)
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
self.pg0.get_capture(0)
|
|
self.logger.info(self.vapi.cli("show trace"))
|
|
self.socket_client_close()
|
|
|
|
for p in self.ports:
|
|
self.assertEqual(self.portsCheck[p], 0)
|
|
self.vapi.punt_socket_deregister(p, is_ip4=0)
|
|
punts = self.vapi.punt_socket_dump(is_ip6=1)
|
|
self.assertEqual(len(punts), 0)
|
|
|
|
def test_punt_socket_traffic_multi_ports_single_socket(self):
|
|
""" Punt socket traffic multi ports and single socket"""
|
|
|
|
for p in self.ports:
|
|
self.portsCheck[p] = 0
|
|
|
|
#
|
|
# create stream with random pakets count per given ports
|
|
#
|
|
pkts = list()
|
|
for _ in range(0, self.nr_packets):
|
|
# choose port from port list
|
|
p = random.choice(self.ports)
|
|
pkts.append((
|
|
Ether(src=self.pg0.remote_mac,
|
|
dst=self.pg0.local_mac) /
|
|
IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
|
|
inet6.UDP(sport=9876, dport=p) /
|
|
Raw('\xa5' * 100)))
|
|
self.portsCheck[p] += 1
|
|
|
|
#
|
|
# no punt socket
|
|
#
|
|
punts = self.vapi.punt_socket_dump(is_ip6=1)
|
|
self.assertEqual(len(punts), 0)
|
|
|
|
#
|
|
# configure a punt socket
|
|
#
|
|
self.socket_client_create(self.tempdir+"/socket_multi")
|
|
for p in self.ports:
|
|
self.vapi.punt_socket_register(p, self.tempdir+"/socket_multi",
|
|
is_ip4=0)
|
|
punts = self.vapi.punt_socket_dump(is_ip6=1)
|
|
self.assertEqual(len(punts), len(self.ports))
|
|
|
|
for p in self.ports:
|
|
self.logger.debug("Send %s packets to port %d",
|
|
str(self.portsCheck[p]), p)
|
|
#
|
|
# expect punt socket and no packets on pg0
|
|
#
|
|
self.vapi.cli("clear errors")
|
|
self.vapi.cli("clear trace")
|
|
self.pg0.add_stream(pkts)
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
self.pg0.get_capture(0)
|
|
self.logger.info(self.vapi.cli("show trace"))
|
|
self.socket_client_close()
|
|
|
|
for p in self.ports:
|
|
self.assertEqual(self.portsCheck[p], 0)
|
|
self.vapi.punt_socket_deregister(p, is_ip4=0)
|
|
punts = self.vapi.punt_socket_dump(is_ip6=1)
|
|
self.assertEqual(len(punts), 0)
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main(testRunner=VppTestRunner)
|