tests: Added bpf trace filter plugin test
Type: test Change-Id: I026d9298fe1372d03f61b6ad57c82628bab4c831 Signed-off-by: adrianvillin <avillin@cisco.com>
This commit is contained in:

committed by
Mohammed HAWARI

parent
04d262d1eb
commit
c6fe617490
@ -132,14 +132,14 @@ class VppTestCase(VppAsfTestCase):
|
||||
cls._pcaps.append((intf, worker))
|
||||
|
||||
@classmethod
|
||||
def pg_start(cls, trace=True):
|
||||
def pg_start(cls, trace=True, traceFilter=False):
|
||||
"""Enable the PG, wait till it is done, then clean up"""
|
||||
for intf, worker in cls._old_pcaps:
|
||||
intf.remove_old_pcap_file(intf.get_in_path(worker))
|
||||
cls._old_pcaps = []
|
||||
if trace:
|
||||
cls.vapi.cli("clear trace")
|
||||
cls.vapi.cli("trace add pg-input 1000")
|
||||
cls.vapi.cli("trace add pg-input 1000" + (" filter" if traceFilter else ""))
|
||||
cls.vapi.cli("packet-generator enable")
|
||||
# PG, when starts, runs to completion -
|
||||
# so let's avoid a race condition,
|
||||
|
105
test/test_bpf_trace_filter.py
Normal file
105
test/test_bpf_trace_filter.py
Normal file
@ -0,0 +1,105 @@
|
||||
from framework import VppTestCase
|
||||
from asfframework import VppTestRunner
|
||||
import unittest
|
||||
from config import config
|
||||
from scapy.layers.l2 import Ether
|
||||
from scapy.layers.inet import IP, UDP
|
||||
from random import randint
|
||||
|
||||
|
||||
@unittest.skipIf(
|
||||
"bpf_trace_filter" in config.excluded_plugins,
|
||||
"Exclude BPF Trace Filter plugin tests",
|
||||
)
|
||||
class TestBpfTraceFilter(VppTestCase):
|
||||
"""BPF Trace filter test"""
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(TestBpfTraceFilter, cls).setUpClass()
|
||||
try:
|
||||
cls.create_pg_interfaces(range(2))
|
||||
for i in cls.pg_interfaces:
|
||||
i.config_ip4()
|
||||
i.resolve_arp()
|
||||
i.admin_up()
|
||||
except Exception:
|
||||
cls.tearDownClass()
|
||||
raise
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
for i in cls.pg_interfaces:
|
||||
i.unconfig_ip4()
|
||||
i.admin_down()
|
||||
super(TestBpfTraceFilter, cls).tearDownClass()
|
||||
|
||||
# reset trace filter before each test
|
||||
def setUp(self):
|
||||
super(TestBpfTraceFilter, self).setUp()
|
||||
self.vapi.cli("set trace filter function vnet_is_packet_traced")
|
||||
self.vapi.cli("clear trace")
|
||||
|
||||
def create_stream(self, src_if, dst_if, count):
|
||||
packets = []
|
||||
for i in range(count):
|
||||
info = self.create_packet_info(src_if, dst_if)
|
||||
payload = self.info_to_payload(info)
|
||||
p = (
|
||||
Ether(dst=src_if.local_mac, src=src_if.remote_mac)
|
||||
/ IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
|
||||
/ UDP(sport=randint(49152, 65535), dport=5678)
|
||||
)
|
||||
info.data = p.copy()
|
||||
packets.append(p)
|
||||
return packets
|
||||
|
||||
def test_bpf_trace_filter_cli(self):
|
||||
"""BPF Trace filter test [CLI]"""
|
||||
self.vapi.cli("set bpf trace filter {{tcp}}")
|
||||
self.vapi.cli("set trace filter function bpf_trace_filter")
|
||||
|
||||
packets = self.create_stream(self.pg0, self.pg1, 3)
|
||||
self.pg0.add_stream(packets)
|
||||
self.pg_start(traceFilter=True)
|
||||
|
||||
# verify that bpf trace filter has been selected
|
||||
reply = self.vapi.cli("show trace filter function")
|
||||
self.assertIn(
|
||||
"(*) name:bpf_trace_filter", reply, "BPF Trace filter is not selected"
|
||||
)
|
||||
|
||||
# verify that trace is empty
|
||||
reply = self.vapi.cli("show trace")
|
||||
self.assertIn(
|
||||
"No packets in trace buffer",
|
||||
reply,
|
||||
"Unexpected packets in the trace buffer",
|
||||
)
|
||||
|
||||
def test_bpf_trace_filter_vapi(self):
|
||||
"""BPF Trace filter test [VAPI]"""
|
||||
self.vapi.bpf_trace_filter_set(filter="tcp")
|
||||
self.vapi.trace_set_filter_function(filter_function_name="bpf_trace_filter")
|
||||
|
||||
packets = self.create_stream(self.pg0, self.pg1, 3)
|
||||
self.pg0.add_stream(packets)
|
||||
self.pg_start(traceFilter=True)
|
||||
|
||||
# verify that bpf trace filter has been selected
|
||||
reply = self.vapi.cli("show trace filter function")
|
||||
self.assertIn(
|
||||
"(*) name:bpf_trace_filter", reply, "BPF Trace filter is not selected"
|
||||
)
|
||||
|
||||
# verify that trace is empty
|
||||
reply = self.vapi.cli("show trace")
|
||||
self.assertIn(
|
||||
"No packets in trace buffer",
|
||||
reply,
|
||||
"Unexpected packets in the trace buffer",
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main(testRunner=VppTestRunner)
|
Reference in New Issue
Block a user