
Type: improvement Change-Id: I7ca860dbee0d0a24b7f00943142d8c878ed90e80 Signed-off-by: Hadi Rayan Al-Sandid <halsandi@cisco.com>
197 lines
6.5 KiB
Python
197 lines
6.5 KiB
Python
from framework import VppTestCase
|
|
from asfframework import VppTestRunner
|
|
from vpp_papi_provider import CliFailedCommandError
|
|
import os
|
|
import unittest
|
|
from config import config
|
|
from scapy.layers.l2 import Ether
|
|
from scapy.layers.inet import IP, UDP
|
|
from random import randint
|
|
|
|
|
|
@unittest.skipIf(
|
|
"bpf_trace_filter" in config.excluded_plugins,
|
|
"Exclude BPF Trace Filter plugin tests",
|
|
)
|
|
class TestBpfTraceFilter(VppTestCase):
|
|
"""BPF Trace filter test"""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(TestBpfTraceFilter, cls).setUpClass()
|
|
try:
|
|
cls.create_pg_interfaces(range(2))
|
|
for i in cls.pg_interfaces:
|
|
i.config_ip4()
|
|
i.resolve_arp()
|
|
i.admin_up()
|
|
except Exception:
|
|
cls.tearDownClass()
|
|
raise
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
for i in cls.pg_interfaces:
|
|
i.unconfig_ip4()
|
|
i.admin_down()
|
|
super(TestBpfTraceFilter, cls).tearDownClass()
|
|
|
|
# reset trace filter before each test
|
|
def setUp(self):
|
|
super(TestBpfTraceFilter, self).setUp()
|
|
self.vapi.cli("set trace filter function vnet_is_packet_traced")
|
|
self.vapi.cli("clear trace")
|
|
|
|
def create_stream(self, src_if, dst_if, count):
|
|
packets = []
|
|
for i in range(count):
|
|
info = self.create_packet_info(src_if, dst_if)
|
|
payload = self.info_to_payload(info)
|
|
p = (
|
|
Ether(dst=src_if.local_mac, src=src_if.remote_mac)
|
|
/ IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
|
|
/ UDP(sport=randint(49152, 65535), dport=5678 + i)
|
|
)
|
|
info.data = p.copy()
|
|
packets.append(p)
|
|
return packets
|
|
|
|
def test_bpf_trace_filter_cli(self):
|
|
"""BPF Trace filter test [CLI]"""
|
|
self.vapi.cli("set bpf trace filter {{tcp}}")
|
|
self.vapi.cli("set trace filter function bpf_trace_filter")
|
|
|
|
packets = self.create_stream(self.pg0, self.pg1, 3)
|
|
self.pg0.add_stream(packets)
|
|
self.pg_start(traceFilter=True)
|
|
|
|
# verify that bpf trace filter has been selected
|
|
reply = self.vapi.cli("show trace filter function")
|
|
self.assertIn(
|
|
"(*) name:bpf_trace_filter", reply, "BPF Trace filter is not selected"
|
|
)
|
|
|
|
# verify that trace is empty
|
|
reply = self.vapi.cli("show trace")
|
|
self.assertIn(
|
|
"No packets in trace buffer",
|
|
reply,
|
|
"Unexpected packets in the trace buffer",
|
|
)
|
|
|
|
# verify that bpf trace filter has been selected for pcap
|
|
self.vapi.cli("set pcap filter function bpf_trace_filter")
|
|
self.vapi.cli("pcap trace rx tx max 1000 intfc any filter")
|
|
|
|
# verify that packets are filtered in not captured in pcap
|
|
self.pg0.add_stream(packets)
|
|
self.pg_start()
|
|
|
|
with self.assertRaises(CliFailedCommandError):
|
|
self.vapi.cli("pcap trace rx tx off")
|
|
|
|
reply = self.vapi.cli("show bpf trace filter")
|
|
self.assertIn("(000)", reply, "Unexpected bpf filter dump")
|
|
|
|
def test_bpf_trace_filter_vapi(self):
|
|
"""BPF Trace filter test [VAPI]"""
|
|
self.vapi.bpf_trace_filter_set(filter="tcp")
|
|
self.vapi.trace_set_filter_function(filter_function_name="bpf_trace_filter")
|
|
|
|
packets = self.create_stream(self.pg0, self.pg1, 3)
|
|
self.pg0.add_stream(packets)
|
|
self.pg_start(traceFilter=True)
|
|
|
|
# verify that bpf trace filter has been selected
|
|
reply = self.vapi.cli("show trace filter function")
|
|
self.assertIn(
|
|
"(*) name:bpf_trace_filter", reply, "BPF Trace filter is not selected"
|
|
)
|
|
|
|
# verify that trace is empty
|
|
reply = self.vapi.cli("show trace")
|
|
self.assertIn(
|
|
"No packets in trace buffer",
|
|
reply,
|
|
"Unexpected packets in the trace buffer",
|
|
)
|
|
|
|
# verify that bpf trace filter has been selected for pcap
|
|
self.vapi.pcap_set_filter_function(filter_function_name="bpf_trace_filter")
|
|
reply = self.vapi.cli("show pcap filter function")
|
|
self.assertIn(
|
|
"(*) name:bpf_trace_filter",
|
|
reply,
|
|
"BPF Trace filter is not selected for Pcap",
|
|
)
|
|
|
|
# verify that packets are filtered in not captured in pcap
|
|
self.vapi.pcap_trace_on(
|
|
capture_rx=True,
|
|
capture_tx=True,
|
|
max_packets=1000,
|
|
filter=True,
|
|
sw_if_index=0,
|
|
filename="bpf_trace.pcap",
|
|
)
|
|
self.pg0.add_stream(packets)
|
|
self.pg_start()
|
|
with self.vapi.assert_negative_api_retval():
|
|
self.vapi.pcap_trace_off()
|
|
|
|
def test_bpf_trace_filter_vapi_v2(self):
|
|
"""BPF Trace filter test [VAPI v2]"""
|
|
self.vapi.bpf_trace_filter_set_v2(filter="tcp or dst port 5678")
|
|
self.vapi.trace_set_filter_function(filter_function_name="bpf_trace_filter")
|
|
|
|
packets = self.create_stream(self.pg0, self.pg1, 3)
|
|
self.pg0.add_stream(packets)
|
|
self.pg_start(traceFilter=True)
|
|
|
|
# verify that bpf trace filter has been selected
|
|
reply = self.vapi.cli("show trace filter function")
|
|
self.assertIn(
|
|
"(*) name:bpf_trace_filter", reply, "BPF Trace filter is not selected"
|
|
)
|
|
|
|
# verify that trace is filtered
|
|
reply = self.vapi.cli("show trace")
|
|
self.assertIn(
|
|
"Packet 1\n",
|
|
reply,
|
|
"No expected packets in the trace buffer",
|
|
)
|
|
self.assertNotIn(
|
|
"Packet 2\n",
|
|
reply,
|
|
"Unexpected packets in the trace buffer",
|
|
)
|
|
|
|
# verify that bpf trace filter has been selected for pcap
|
|
self.vapi.pcap_set_filter_function(filter_function_name="bpf_trace_filter")
|
|
reply = self.vapi.cli("show pcap filter function")
|
|
self.assertIn(
|
|
"(*) name:bpf_trace_filter",
|
|
reply,
|
|
"BPF Trace filter is not selected for Pcap",
|
|
)
|
|
|
|
# verify that packets are captured in pcap with filter
|
|
self.vapi.pcap_trace_on(
|
|
capture_rx=True,
|
|
capture_tx=True,
|
|
max_packets=1000,
|
|
filter=True,
|
|
sw_if_index=0,
|
|
filename="bpf_trace.pcap",
|
|
)
|
|
self.pg0.add_stream(packets)
|
|
self.pg_start()
|
|
self.vapi.pcap_trace_off()
|
|
self.assertTrue(os.path.exists("/tmp/bpf_trace.pcap"))
|
|
os.remove("/tmp/bpf_trace.pcap")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main(testRunner=VppTestRunner)
|