classify: Large and nested classifer unit tests

Type: test

Large and nested unit tests to test AVX-512 optimized versions of the classify
hash and match algorithims.

Signed-off-by: Ray Kinsella <mdr@ashroe.eu>
Change-Id: Ie423fee5e0fd1cb4bdf3bec8e0230a5f7cfc75fc
This commit is contained in:
Ray Kinsella
2021-09-22 11:24:06 +01:00
committed by Damjan Marion
parent c022b2fe39
commit b8165b96f5
3 changed files with 352 additions and 20 deletions

View File

@ -5,12 +5,12 @@ import socket
import unittest
from framework import VppTestCase, VppTestRunner
from scapy.packet import Raw, Packet
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, TCP
from util import ppp
from template_classifier import TestClassifier
from template_classifier import TestClassifier, VarMask, VarMatch
from vpp_ip_route import VppIpRoute, VppRoutePath
from vpp_ip import INVALID_INDEX
@ -504,6 +504,284 @@ class TestClassifierMAC(TestClassifier):
self.pg3.assert_nothing_captured(remark="packets forwarded")
class TestClassifierComplex(TestClassifier):
""" Large & Nested Classifiers Test Cases """
@classmethod
def setUpClass(cls):
super(TestClassifierComplex, cls).setUpClass()
@classmethod
def tearDownClass(cls):
super(TestClassifierComplex, cls).tearDownClass()
def test_iacl_large(self):
""" Large input ACL test
Test scenario for Large ACL matching on ethernet+ip+udp headers
- Create IPv4 stream for pg0 -> pg1 interface.
- Create large acl matching on ethernet+ip+udp header fields
- Send and verify received packets on pg1 interface.
"""
# 40b offset = 80bytes - (sizeof(UDP/IP/ETH) + 4b)
# + 4b as build_ip_ma*() func, do not match against UDP Len & Chksum
msk = VarMask(offset=40, spec='ffff')
mth = VarMatch(offset=40, value=0x1234, length=2)
payload_msk = self.build_payload_mask([msk])
payload_match = self.build_payload_match([mth])
sport = 13720
dport = 9080
# 36b offset = 80bytes - (sizeof(UDP/IP/ETH))
packet_ex = bytes.fromhex(('0' * 36) + '1234')
pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
UDP(sport=sport, dport=dport),
packet_ex)
self.pg0.add_stream(pkts)
key = 'large_in'
self.create_classify_table(
key,
self.build_mac_mask(src_mac='ffffffffffff',
dst_mac='ffffffffffff',
ether_type='ffff') +
self.build_ip_mask(proto='ff',
src_ip='ffffffff',
dst_ip='ffffffff',
src_port='ffff',
dst_port='ffff') +
payload_msk,
data_offset=-14)
self.create_classify_session(
self.acl_tbl_idx.get(key),
self.build_mac_match(src_mac=self.pg0.remote_mac,
dst_mac=self.pg0.local_mac,
# ipv4 next header
ether_type='0800') +
self.build_ip_match(proto=socket.IPPROTO_UDP,
src_ip=self.pg0.remote_ip4,
dst_ip=self.pg1.remote_ip4,
src_port=sport,
dst_port=dport) +
payload_match
)
self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
self.acl_active_table = key
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
pkts = self.pg1.get_capture(len(pkts))
self.verify_capture(self.pg1, pkts)
self.pg0.assert_nothing_captured(remark="packets forwarded")
self.pg2.assert_nothing_captured(remark="packets forwarded")
self.pg3.assert_nothing_captured(remark="packets forwarded")
def test_oacl_large(self):
""" Large output ACL test
Test scenario for Large ACL matching on ethernet+ip+udp headers
- Create IPv4 stream for pg1 -> pg0 interface.
- Create large acl matching on ethernet+ip+udp header fields
- Send and verify received packets on pg0 interface.
"""
# 40b offset = 80bytes - (sizeof(UDP/IP/ETH) + 4b)
# + 4b as build_ip_ma*() func, do not match against UDP Len & Chksum
msk = VarMask(offset=40, spec='ffff')
mth = VarMatch(offset=40, value=0x1234, length=2)
payload_msk = self.build_payload_mask([msk])
payload_match = self.build_payload_match([mth])
sport = 13720
dport = 9080
# 36b offset = 80bytes - (sizeof(UDP/IP/ETH))
packet_ex = bytes.fromhex(('0' * 36) + '1234')
pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes,
UDP(sport=sport, dport=dport),
packet_ex)
self.pg1.add_stream(pkts)
key = 'large_out'
self.create_classify_table(
key,
self.build_mac_mask(src_mac='ffffffffffff',
dst_mac='ffffffffffff',
ether_type='ffff') +
self.build_ip_mask(proto='ff',
src_ip='ffffffff',
dst_ip='ffffffff',
src_port='ffff',
dst_port='ffff') +
payload_msk,
data_offset=-14)
self.create_classify_session(
self.acl_tbl_idx.get(key),
self.build_mac_match(src_mac=self.pg0.local_mac,
dst_mac=self.pg0.remote_mac,
# ipv4 next header
ether_type='0800') +
self.build_ip_match(proto=socket.IPPROTO_UDP,
src_ip=self.pg1.remote_ip4,
dst_ip=self.pg0.remote_ip4,
src_port=sport,
dst_port=dport) +
payload_match)
self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
self.acl_active_table = key
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
pkts = self.pg0.get_capture(len(pkts))
self.verify_capture(self.pg0, pkts)
self.pg1.assert_nothing_captured(remark="packets forwarded")
self.pg2.assert_nothing_captured(remark="packets forwarded")
self.pg3.assert_nothing_captured(remark="packets forwarded")
def test_iacl_nested(self):
""" Nested input ACL test
Test scenario for Large ACL matching on ethernet+ip+udp headers
- Create IPv4 stream for pg0 -> pg1 interface.
- Create 1st classifier table, without any entries
- Create nested acl matching on ethernet+ip+udp header fields
- Send and verify received packets on pg1 interface.
"""
sport = 13720
dport = 9080
pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
UDP(sport=sport, dport=dport))
self.pg0.add_stream(pkts)
subtable_key = 'subtable_in'
self.create_classify_table(
subtable_key,
self.build_mac_mask(src_mac='ffffffffffff',
dst_mac='ffffffffffff',
ether_type='ffff') +
self.build_ip_mask(proto='ff',
src_ip='ffffffff',
dst_ip='ffffffff',
src_port='ffff',
dst_port='ffff'),
data_offset=-14)
key = 'nested_in'
self.create_classify_table(
key,
self.build_mac_mask(src_mac='ffffffffffff',
dst_mac='ffffffffffff',
ether_type='ffff') +
self.build_ip_mask(proto='ff',
src_ip='ffffffff',
dst_ip='ffffffff',
src_port='ffff',
dst_port='ffff'),
next_table_index=self.acl_tbl_idx.get(subtable_key))
self.create_classify_session(
self.acl_tbl_idx.get(subtable_key),
self.build_mac_match(src_mac=self.pg0.remote_mac,
dst_mac=self.pg0.local_mac,
# ipv4 next header
ether_type='0800') +
self.build_ip_match(proto=socket.IPPROTO_UDP,
src_ip=self.pg0.remote_ip4,
dst_ip=self.pg1.remote_ip4,
src_port=sport,
dst_port=dport))
self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
self.acl_active_table = key
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
pkts = self.pg1.get_capture(len(pkts))
self.verify_capture(self.pg1, pkts)
self.pg0.assert_nothing_captured(remark="packets forwarded")
self.pg2.assert_nothing_captured(remark="packets forwarded")
self.pg3.assert_nothing_captured(remark="packets forwarded")
def test_oacl_nested(self):
""" Nested output ACL test
Test scenario for Large ACL matching on ethernet+ip+udp headers
- Create IPv4 stream for pg1 -> pg0 interface.
- Create 1st classifier table, without any entries
- Create nested acl matching on ethernet+ip+udp header fields
- Send and verify received packets on pg0 interface.
"""
sport = 13720
dport = 9080
pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes,
UDP(sport=sport, dport=dport))
self.pg1.add_stream(pkts)
subtable_key = 'subtable_out'
self.create_classify_table(
subtable_key,
self.build_mac_mask(src_mac='ffffffffffff',
dst_mac='ffffffffffff',
ether_type='ffff') +
self.build_ip_mask(proto='ff',
src_ip='ffffffff',
dst_ip='ffffffff',
src_port='ffff',
dst_port='ffff'),
data_offset=-14)
key = 'nested_out'
self.create_classify_table(
key,
self.build_mac_mask(src_mac='ffffffffffff',
dst_mac='ffffffffffff',
ether_type='ffff') +
self.build_ip_mask(proto='ff',
src_ip='ffffffff',
dst_ip='ffffffff',
src_port='ffff',
dst_port='ffff'),
next_table_index=self.acl_tbl_idx.get(subtable_key),
data_offset=-14)
self.create_classify_session(
self.acl_tbl_idx.get(subtable_key),
self.build_mac_match(src_mac=self.pg0.local_mac,
dst_mac=self.pg0.remote_mac,
# ipv4 next header
ether_type='0800') +
self.build_ip_match(proto=socket.IPPROTO_UDP,
src_ip=self.pg1.remote_ip4,
dst_ip=self.pg0.remote_ip4,
src_port=sport,
dst_port=dport))
self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
self.acl_active_table = key
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
pkts = self.pg0.get_capture(len(pkts))
self.verify_capture(self.pg0, pkts)
self.pg1.assert_nothing_captured(remark="packets forwarded")
self.pg2.assert_nothing_captured(remark="packets forwarded")
self.pg3.assert_nothing_captured(remark="packets forwarded")
class TestClassifierPBR(TestClassifier):
""" Classifier PBR Test Case """