7f9b7f9f49
Continuation/Part 2 of https://gerrit.fd.io/r/#/c/17092/ Change-Id: Id0122d84eaf2c05d29e5be63a594d5e528ee7c9a Signed-off-by: Paul Vinciguerra <pvinci@vinciconsulting.com>
688 lines
27 KiB
Python
688 lines
27 KiB
Python
#!/usr/bin/env python
|
|
""" Classifier-based L2 ACL Test Case HLD:
|
|
"""
|
|
|
|
import unittest
|
|
import random
|
|
import binascii
|
|
import socket
|
|
|
|
|
|
from scapy.packet import Raw
|
|
from scapy.data import ETH_P_IP
|
|
from scapy.layers.l2 import Ether
|
|
from scapy.layers.inet import IP, TCP, UDP, ICMP
|
|
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
|
|
from scapy.layers.inet6 import IPv6ExtHdrFragment
|
|
from framework import VppTestCase, VppTestRunner
|
|
from util import Host, ppp
|
|
|
|
|
|
class TestClassifyAcl(VppTestCase):
|
|
""" Classifier-based L2 input and output ACL Test Case """
|
|
|
|
# traffic types
|
|
IP = 0
|
|
ICMP = 1
|
|
|
|
# IP version
|
|
IPRANDOM = -1
|
|
IPV4 = 0
|
|
IPV6 = 1
|
|
|
|
# rule types
|
|
DENY = 0
|
|
PERMIT = 1
|
|
|
|
# supported protocols
|
|
proto = [[6, 17], [1, 58]]
|
|
proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
|
|
ICMPv4 = 0
|
|
ICMPv6 = 1
|
|
TCP = 0
|
|
UDP = 1
|
|
PROTO_ALL = 0
|
|
|
|
# port ranges
|
|
PORTS_ALL = -1
|
|
PORTS_RANGE = 0
|
|
PORTS_RANGE_2 = 1
|
|
udp_sport_from = 10
|
|
udp_sport_to = udp_sport_from + 5
|
|
udp_dport_from = 20000
|
|
udp_dport_to = udp_dport_from + 5000
|
|
tcp_sport_from = 30
|
|
tcp_sport_to = tcp_sport_from + 5
|
|
tcp_dport_from = 40000
|
|
tcp_dport_to = tcp_dport_from + 5000
|
|
|
|
udp_sport_from_2 = 90
|
|
udp_sport_to_2 = udp_sport_from_2 + 5
|
|
udp_dport_from_2 = 30000
|
|
udp_dport_to_2 = udp_dport_from_2 + 5000
|
|
tcp_sport_from_2 = 130
|
|
tcp_sport_to_2 = tcp_sport_from_2 + 5
|
|
tcp_dport_from_2 = 20000
|
|
tcp_dport_to_2 = tcp_dport_from_2 + 5000
|
|
|
|
icmp4_type = 8 # echo request
|
|
icmp4_code = 3
|
|
icmp6_type = 128 # echo request
|
|
icmp6_code = 3
|
|
|
|
icmp4_type_2 = 8
|
|
icmp4_code_from_2 = 5
|
|
icmp4_code_to_2 = 20
|
|
icmp6_type_2 = 128
|
|
icmp6_code_from_2 = 8
|
|
icmp6_code_to_2 = 42
|
|
|
|
# Test variables
|
|
bd_id = 1
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
"""
|
|
Perform standard class setup (defined by class method setUpClass in
|
|
class VppTestCase) before running the test case, set test case related
|
|
variables and configure VPP.
|
|
"""
|
|
super(TestClassifyAcl, cls).setUpClass()
|
|
|
|
try:
|
|
# Create 2 pg interfaces
|
|
cls.create_pg_interfaces(range(2))
|
|
|
|
# Packet flows mapping pg0 -> pg1, pg2 etc.
|
|
cls.flows = dict()
|
|
cls.flows[cls.pg0] = [cls.pg1]
|
|
|
|
# Packet sizes
|
|
cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
|
|
|
|
# Create BD with MAC learning and unknown unicast flooding disabled
|
|
# and put interfaces to this BD
|
|
cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
|
|
learn=1)
|
|
for pg_if in cls.pg_interfaces:
|
|
cls.vapi.sw_interface_set_l2_bridge(
|
|
rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id)
|
|
|
|
# Set up all interfaces
|
|
for i in cls.pg_interfaces:
|
|
i.admin_up()
|
|
|
|
# Mapping between packet-generator index and lists of test hosts
|
|
cls.hosts_by_pg_idx = dict()
|
|
for pg_if in cls.pg_interfaces:
|
|
cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
|
|
|
|
# Create list of deleted hosts
|
|
cls.deleted_hosts_by_pg_idx = dict()
|
|
for pg_if in cls.pg_interfaces:
|
|
cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
|
|
|
|
# warm-up the mac address tables
|
|
# self.warmup_test()
|
|
|
|
# Holder of the active classify table key
|
|
cls.acl_active_table = ''
|
|
|
|
except Exception:
|
|
super(TestClassifyAcl, cls).tearDownClass()
|
|
raise
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
super(TestClassifyAcl, cls).tearDownClass()
|
|
|
|
def setUp(self):
|
|
super(TestClassifyAcl, self).setUp()
|
|
|
|
self.acl_tbl_idx = {}
|
|
self.reset_packet_infos()
|
|
|
|
def tearDown(self):
|
|
"""
|
|
Show various debug prints after each test.
|
|
"""
|
|
if not self.vpp_dead:
|
|
self.logger.info(self.vapi.ppcli("show inacl type l2"))
|
|
self.logger.info(self.vapi.ppcli("show outacl type l2"))
|
|
self.logger.info(self.vapi.ppcli("show classify tables verbose"))
|
|
self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
|
|
% self.bd_id))
|
|
if self.acl_active_table == 'mac_inout':
|
|
self.output_acl_set_interface(
|
|
self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0)
|
|
self.input_acl_set_interface(
|
|
self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
|
|
self.acl_active_table = ''
|
|
elif self.acl_active_table == 'mac_out':
|
|
self.output_acl_set_interface(
|
|
self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0)
|
|
self.acl_active_table = ''
|
|
elif self.acl_active_table == 'mac_in':
|
|
self.input_acl_set_interface(
|
|
self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
|
|
self.acl_active_table = ''
|
|
|
|
super(TestClassifyAcl, self).tearDown()
|
|
|
|
@staticmethod
|
|
def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
|
|
"""Build MAC ACL mask data with hexstring format
|
|
|
|
:param str dst_mac: source MAC address <0-ffffffffffff>
|
|
:param str src_mac: destination MAC address <0-ffffffffffff>
|
|
:param str ether_type: ethernet type <0-ffff>
|
|
"""
|
|
|
|
return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
|
|
dst_mac, src_mac, ether_type)).rstrip('0')
|
|
|
|
@staticmethod
|
|
def build_mac_match(dst_mac='', src_mac='', ether_type=''):
|
|
"""Build MAC ACL match data with hexstring format
|
|
|
|
:param str dst_mac: source MAC address <x:x:x:x:x:x>
|
|
:param str src_mac: destination MAC address <x:x:x:x:x:x>
|
|
:param str ether_type: ethernet type <0-ffff>
|
|
"""
|
|
if dst_mac:
|
|
dst_mac = dst_mac.replace(':', '')
|
|
if src_mac:
|
|
src_mac = src_mac.replace(':', '')
|
|
|
|
return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
|
|
dst_mac, src_mac, ether_type)).rstrip('0')
|
|
|
|
def create_classify_table(self, key, mask, data_offset=0, is_add=1):
|
|
"""Create Classify Table
|
|
|
|
:param str key: key for classify table (ex, ACL name).
|
|
:param str mask: mask value for interested traffic.
|
|
:param int match_n_vectors:
|
|
:param int is_add: option to configure classify table.
|
|
- create(1) or delete(0)
|
|
"""
|
|
r = self.vapi.classify_add_del_table(
|
|
is_add,
|
|
binascii.unhexlify(mask),
|
|
match_n_vectors=(len(mask) - 1) // 32 + 1,
|
|
miss_next_index=0,
|
|
current_data_flag=1,
|
|
current_data_offset=data_offset)
|
|
self.assertIsNotNone(r, 'No response msg for add_del_table')
|
|
self.acl_tbl_idx[key] = r.new_table_index
|
|
|
|
def create_classify_session(self, intf, table_index, match,
|
|
hit_next_index=0xffffffff, is_add=1):
|
|
"""Create Classify Session
|
|
|
|
:param VppInterface intf: Interface to apply classify session.
|
|
:param int table_index: table index to identify classify table.
|
|
:param str match: matched value for interested traffic.
|
|
:param int pbr_action: enable/disable PBR feature.
|
|
:param int vrfid: VRF id.
|
|
:param int is_add: option to configure classify session.
|
|
- create(1) or delete(0)
|
|
"""
|
|
r = self.vapi.classify_add_del_session(
|
|
is_add,
|
|
table_index,
|
|
binascii.unhexlify(match),
|
|
hit_next_index=hit_next_index)
|
|
self.assertIsNotNone(r, 'No response msg for add_del_session')
|
|
|
|
def input_acl_set_interface(self, intf, table_index, is_add=1):
|
|
"""Configure Input ACL interface
|
|
|
|
:param VppInterface intf: Interface to apply Input ACL feature.
|
|
:param int table_index: table index to identify classify table.
|
|
:param int is_add: option to configure classify session.
|
|
- enable(1) or disable(0)
|
|
"""
|
|
r = self.vapi.input_acl_set_interface(
|
|
is_add,
|
|
intf.sw_if_index,
|
|
l2_table_index=table_index)
|
|
self.assertIsNotNone(r, 'No response msg for acl_set_interface')
|
|
|
|
def output_acl_set_interface(self, intf, table_index, is_add=1):
|
|
"""Configure Output ACL interface
|
|
|
|
:param VppInterface intf: Interface to apply Output ACL feature.
|
|
:param int table_index: table index to identify classify table.
|
|
:param int is_add: option to configure classify session.
|
|
- enable(1) or disable(0)
|
|
"""
|
|
r = self.vapi.output_acl_set_interface(
|
|
is_add,
|
|
intf.sw_if_index,
|
|
l2_table_index=table_index)
|
|
self.assertIsNotNone(r, 'No response msg for acl_set_interface')
|
|
|
|
def create_hosts(self, count, start=0):
|
|
"""
|
|
Create required number of host MAC addresses and distribute them among
|
|
interfaces. Create host IPv4 address for every host MAC address.
|
|
|
|
:param int count: Number of hosts to create MAC/IPv4 addresses for.
|
|
:param int start: Number to start numbering from.
|
|
"""
|
|
n_int = len(self.pg_interfaces)
|
|
macs_per_if = count / n_int
|
|
i = -1
|
|
for pg_if in self.pg_interfaces:
|
|
i += 1
|
|
start_nr = macs_per_if * i + start
|
|
end_nr = count + start if i == (n_int - 1) \
|
|
else macs_per_if * (i + 1) + start
|
|
hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
|
|
for j in range(start_nr, end_nr):
|
|
host = Host(
|
|
"00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
|
|
"172.17.1%02x.%u" % (pg_if.sw_if_index, j),
|
|
"2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
|
|
hosts.append(host)
|
|
|
|
def create_upper_layer(self, packet_index, proto, ports=0):
|
|
p = self.proto_map[proto]
|
|
if p == 'UDP':
|
|
if ports == 0:
|
|
return UDP(sport=random.randint(self.udp_sport_from,
|
|
self.udp_sport_to),
|
|
dport=random.randint(self.udp_dport_from,
|
|
self.udp_dport_to))
|
|
else:
|
|
return UDP(sport=ports, dport=ports)
|
|
elif p == 'TCP':
|
|
if ports == 0:
|
|
return TCP(sport=random.randint(self.tcp_sport_from,
|
|
self.tcp_sport_to),
|
|
dport=random.randint(self.tcp_dport_from,
|
|
self.tcp_dport_to))
|
|
else:
|
|
return TCP(sport=ports, dport=ports)
|
|
return ''
|
|
|
|
def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
|
|
proto=-1, ports=0, fragments=False,
|
|
pkt_raw=True, etype=-1):
|
|
"""
|
|
Create input packet stream for defined interface using hosts or
|
|
deleted_hosts list.
|
|
|
|
:param object src_if: Interface to create packet stream for.
|
|
:param list packet_sizes: List of required packet sizes.
|
|
:param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
|
|
:return: Stream of packets.
|
|
"""
|
|
pkts = []
|
|
if self.flows.__contains__(src_if):
|
|
src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
|
|
for dst_if in self.flows[src_if]:
|
|
dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
|
|
n_int = len(dst_hosts) * len(src_hosts)
|
|
for i in range(0, n_int):
|
|
dst_host = dst_hosts[i / len(src_hosts)]
|
|
src_host = src_hosts[i % len(src_hosts)]
|
|
pkt_info = self.create_packet_info(src_if, dst_if)
|
|
if ipv6 == 1:
|
|
pkt_info.ip = 1
|
|
elif ipv6 == 0:
|
|
pkt_info.ip = 0
|
|
else:
|
|
pkt_info.ip = random.choice([0, 1])
|
|
if proto == -1:
|
|
pkt_info.proto = random.choice(self.proto[self.IP])
|
|
else:
|
|
pkt_info.proto = proto
|
|
payload = self.info_to_payload(pkt_info)
|
|
p = Ether(dst=dst_host.mac, src=src_host.mac)
|
|
if etype > 0:
|
|
p = Ether(dst=dst_host.mac,
|
|
src=src_host.mac,
|
|
type=etype)
|
|
if pkt_info.ip:
|
|
p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
|
|
if fragments:
|
|
p /= IPv6ExtHdrFragment(offset=64, m=1)
|
|
else:
|
|
if fragments:
|
|
p /= IP(src=src_host.ip4, dst=dst_host.ip4,
|
|
flags=1, frag=64)
|
|
else:
|
|
p /= IP(src=src_host.ip4, dst=dst_host.ip4)
|
|
if traffic_type == self.ICMP:
|
|
if pkt_info.ip:
|
|
p /= ICMPv6EchoRequest(type=self.icmp6_type,
|
|
code=self.icmp6_code)
|
|
else:
|
|
p /= ICMP(type=self.icmp4_type,
|
|
code=self.icmp4_code)
|
|
else:
|
|
p /= self.create_upper_layer(i, pkt_info.proto, ports)
|
|
if pkt_raw:
|
|
p /= Raw(payload)
|
|
pkt_info.data = p.copy()
|
|
if pkt_raw:
|
|
size = random.choice(packet_sizes)
|
|
self.extend_packet(p, size)
|
|
pkts.append(p)
|
|
return pkts
|
|
|
|
def verify_capture(self, pg_if, capture,
|
|
traffic_type=0, ip_type=0, etype=-1):
|
|
"""
|
|
Verify captured input packet stream for defined interface.
|
|
|
|
:param object pg_if: Interface to verify captured packet stream for.
|
|
:param list capture: Captured packet stream.
|
|
:param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
|
|
"""
|
|
last_info = dict()
|
|
for i in self.pg_interfaces:
|
|
last_info[i.sw_if_index] = None
|
|
dst_sw_if_index = pg_if.sw_if_index
|
|
for packet in capture:
|
|
if etype > 0:
|
|
if packet[Ether].type != etype:
|
|
self.logger.error(ppp("Unexpected ethertype in packet:",
|
|
packet))
|
|
else:
|
|
continue
|
|
try:
|
|
# Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
|
|
if traffic_type == self.ICMP and ip_type == self.IPV6:
|
|
payload_info = self.payload_to_info(
|
|
packet[ICMPv6EchoRequest].data)
|
|
payload = packet[ICMPv6EchoRequest]
|
|
else:
|
|
payload_info = self.payload_to_info(packet[Raw])
|
|
payload = packet[self.proto_map[payload_info.proto]]
|
|
except:
|
|
self.logger.error(ppp("Unexpected or invalid packet "
|
|
"(outside network):", packet))
|
|
raise
|
|
|
|
if ip_type != 0:
|
|
self.assertEqual(payload_info.ip, ip_type)
|
|
if traffic_type == self.ICMP:
|
|
try:
|
|
if payload_info.ip == 0:
|
|
self.assertEqual(payload.type, self.icmp4_type)
|
|
self.assertEqual(payload.code, self.icmp4_code)
|
|
else:
|
|
self.assertEqual(payload.type, self.icmp6_type)
|
|
self.assertEqual(payload.code, self.icmp6_code)
|
|
except:
|
|
self.logger.error(ppp("Unexpected or invalid packet "
|
|
"(outside network):", packet))
|
|
raise
|
|
else:
|
|
try:
|
|
ip_version = IPv6 if payload_info.ip == 1 else IP
|
|
|
|
ip = packet[ip_version]
|
|
packet_index = payload_info.index
|
|
|
|
self.assertEqual(payload_info.dst, dst_sw_if_index)
|
|
self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
|
|
(pg_if.name, payload_info.src,
|
|
packet_index))
|
|
next_info = self.get_next_packet_info_for_interface2(
|
|
payload_info.src, dst_sw_if_index,
|
|
last_info[payload_info.src])
|
|
last_info[payload_info.src] = next_info
|
|
self.assertTrue(next_info is not None)
|
|
self.assertEqual(packet_index, next_info.index)
|
|
saved_packet = next_info.data
|
|
# Check standard fields
|
|
self.assertEqual(ip.src, saved_packet[ip_version].src)
|
|
self.assertEqual(ip.dst, saved_packet[ip_version].dst)
|
|
p = self.proto_map[payload_info.proto]
|
|
if p == 'TCP':
|
|
tcp = packet[TCP]
|
|
self.assertEqual(tcp.sport, saved_packet[
|
|
TCP].sport)
|
|
self.assertEqual(tcp.dport, saved_packet[
|
|
TCP].dport)
|
|
elif p == 'UDP':
|
|
udp = packet[UDP]
|
|
self.assertEqual(udp.sport, saved_packet[
|
|
UDP].sport)
|
|
self.assertEqual(udp.dport, saved_packet[
|
|
UDP].dport)
|
|
except:
|
|
self.logger.error(ppp("Unexpected or invalid packet:",
|
|
packet))
|
|
raise
|
|
for i in self.pg_interfaces:
|
|
remaining_packet = self.get_next_packet_info_for_interface2(
|
|
i, dst_sw_if_index, last_info[i.sw_if_index])
|
|
self.assertTrue(
|
|
remaining_packet is None,
|
|
"Port %u: Packet expected from source %u didn't arrive" %
|
|
(dst_sw_if_index, i.sw_if_index))
|
|
|
|
def run_traffic_no_check(self):
|
|
# Test
|
|
# Create incoming packet streams for packet-generator interfaces
|
|
for i in self.pg_interfaces:
|
|
if self.flows.__contains__(i):
|
|
pkts = self.create_stream(i, self.pg_if_packet_sizes)
|
|
if len(pkts) > 0:
|
|
i.add_stream(pkts)
|
|
|
|
# Enable packet capture and start packet sending
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
|
|
def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
|
|
frags=False, pkt_raw=True, etype=-1):
|
|
# Test
|
|
# Create incoming packet streams for packet-generator interfaces
|
|
pkts_cnt = 0
|
|
for i in self.pg_interfaces:
|
|
if self.flows.__contains__(i):
|
|
pkts = self.create_stream(i, self.pg_if_packet_sizes,
|
|
traffic_type, ip_type, proto, ports,
|
|
frags, pkt_raw, etype)
|
|
if len(pkts) > 0:
|
|
i.add_stream(pkts)
|
|
pkts_cnt += len(pkts)
|
|
|
|
# Enable packet capture and start packet sendingself.IPV
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
|
|
# Verify
|
|
# Verify outgoing packet streams per packet-generator interface
|
|
for src_if in self.pg_interfaces:
|
|
if self.flows.__contains__(src_if):
|
|
for dst_if in self.flows[src_if]:
|
|
capture = dst_if.get_capture(pkts_cnt)
|
|
self.logger.info("Verifying capture on interface %s" %
|
|
dst_if.name)
|
|
self.verify_capture(dst_if, capture,
|
|
traffic_type, ip_type, etype)
|
|
|
|
def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
|
|
ports=0, frags=False, etype=-1):
|
|
# Test
|
|
self.reset_packet_infos()
|
|
for i in self.pg_interfaces:
|
|
if self.flows.__contains__(i):
|
|
pkts = self.create_stream(i, self.pg_if_packet_sizes,
|
|
traffic_type, ip_type, proto, ports,
|
|
frags, True, etype)
|
|
if len(pkts) > 0:
|
|
i.add_stream(pkts)
|
|
|
|
# Enable packet capture and start packet sending
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
|
|
# Verify
|
|
# Verify outgoing packet streams per packet-generator interface
|
|
for src_if in self.pg_interfaces:
|
|
if self.flows.__contains__(src_if):
|
|
for dst_if in self.flows[src_if]:
|
|
self.logger.info("Verifying capture on interface %s" %
|
|
dst_if.name)
|
|
capture = dst_if.get_capture(0)
|
|
self.assertEqual(len(capture), 0)
|
|
|
|
def build_classify_table(self, src_mac='', dst_mac='', ether_type='',
|
|
etype='', key='mac', hit_next_index=0xffffffff):
|
|
# Basic ACL testing
|
|
a_mask = self.build_mac_mask(src_mac=src_mac, dst_mac=dst_mac,
|
|
ether_type=ether_type)
|
|
self.create_classify_table(key, a_mask)
|
|
for host in self.hosts_by_pg_idx[self.pg0.sw_if_index]:
|
|
s_mac = host.mac if src_mac else ''
|
|
if dst_mac:
|
|
for dst_if in self.flows[self.pg0]:
|
|
for dst_host in self.hosts_by_pg_idx[dst_if.sw_if_index]:
|
|
self.create_classify_session(
|
|
self.pg0, self.acl_tbl_idx.get(key),
|
|
self.build_mac_match(src_mac=s_mac,
|
|
dst_mac=dst_host.mac,
|
|
ether_type=etype),
|
|
hit_next_index=hit_next_index)
|
|
else:
|
|
self.create_classify_session(
|
|
self.pg0, self.acl_tbl_idx.get(key),
|
|
self.build_mac_match(src_mac=s_mac, dst_mac='',
|
|
ether_type=etype),
|
|
hit_next_index=hit_next_index)
|
|
|
|
def test_0000_warmup_test(self):
|
|
""" Learn the MAC addresses
|
|
"""
|
|
self.create_hosts(2)
|
|
self.run_traffic_no_check()
|
|
|
|
def test_0010_inacl_permit_src_mac(self):
|
|
""" Input L2 ACL test - permit source MAC
|
|
|
|
Test scenario for basic IP ACL with source IP
|
|
- Create IPv4 stream for pg0 -> pg1 interface.
|
|
- Create ACL with source MAC address.
|
|
- Send and verify received packets on pg1 interface.
|
|
"""
|
|
key = 'mac_in'
|
|
self.build_classify_table(src_mac='ffffffffffff', key=key)
|
|
self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
|
|
self.acl_active_table = key
|
|
self.run_verify_test(self.IP, self.IPV4, -1)
|
|
|
|
def test_0011_inacl_permit_dst_mac(self):
|
|
""" Input L2 ACL test - permit destination MAC
|
|
|
|
Test scenario for basic IP ACL with source IP
|
|
- Create IPv4 stream for pg0 -> pg1 interface.
|
|
- Create ACL with destination MAC address.
|
|
- Send and verify received packets on pg1 interface.
|
|
"""
|
|
key = 'mac_in'
|
|
self.build_classify_table(dst_mac='ffffffffffff', key=key)
|
|
self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
|
|
self.acl_active_table = key
|
|
self.run_verify_test(self.IP, self.IPV4, -1)
|
|
|
|
def test_0012_inacl_permit_src_dst_mac(self):
|
|
""" Input L2 ACL test - permit source and destination MAC
|
|
|
|
Test scenario for basic IP ACL with source IP
|
|
- Create IPv4 stream for pg0 -> pg1 interface.
|
|
- Create ACL with source and destination MAC addresses.
|
|
- Send and verify received packets on pg1 interface.
|
|
"""
|
|
key = 'mac_in'
|
|
self.build_classify_table(
|
|
src_mac='ffffffffffff', dst_mac='ffffffffffff', key=key)
|
|
self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
|
|
self.acl_active_table = key
|
|
self.run_verify_test(self.IP, self.IPV4, -1)
|
|
|
|
def test_0013_inacl_permit_ether_type(self):
|
|
""" Input L2 ACL test - permit ether_type
|
|
|
|
Test scenario for basic IP ACL with source IP
|
|
- Create IPv4 stream for pg0 -> pg1 interface.
|
|
- Create ACL with destination MAC address.
|
|
- Send and verify received packets on pg1 interface.
|
|
"""
|
|
key = 'mac_in'
|
|
self.build_classify_table(
|
|
ether_type='ffff', etype=hex(ETH_P_IP)[2:], key=key)
|
|
self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
|
|
self.acl_active_table = key
|
|
self.run_verify_test(self.IP, self.IPV4, -1)
|
|
|
|
def test_0015_inacl_deny(self):
|
|
""" Input L2 ACL test - deny
|
|
|
|
Test scenario for basic IP ACL with source IP
|
|
- Create IPv4 stream for pg0 -> pg1 interface.
|
|
|
|
- Create ACL with source MAC address.
|
|
- Send and verify no received packets on pg1 interface.
|
|
"""
|
|
key = 'mac_in'
|
|
self.build_classify_table(
|
|
src_mac='ffffffffffff', hit_next_index=0, key=key)
|
|
self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
|
|
self.acl_active_table = key
|
|
self.run_verify_negat_test(self.IP, self.IPV4, -1)
|
|
|
|
def test_0020_outacl_permit(self):
|
|
""" Output L2 ACL test - permit
|
|
|
|
Test scenario for basic IP ACL with source IP
|
|
- Create IPv4 stream for pg0 -> pg1 interface.
|
|
- Create ACL with source MAC address.
|
|
- Send and verify received packets on pg1 interface.
|
|
"""
|
|
key = 'mac_out'
|
|
self.build_classify_table(src_mac='ffffffffffff', key=key)
|
|
self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
|
|
self.acl_active_table = key
|
|
self.run_verify_test(self.IP, self.IPV4, -1)
|
|
|
|
def test_0025_outacl_deny(self):
|
|
""" Output L2 ACL test - deny
|
|
|
|
Test scenario for basic IP ACL with source IP
|
|
- Create IPv4 stream for pg0 -> pg1 interface.
|
|
- Create ACL with source MAC address.
|
|
- Send and verify no received packets on pg1 interface.
|
|
"""
|
|
key = 'mac_out'
|
|
self.build_classify_table(
|
|
src_mac='ffffffffffff', hit_next_index=0, key=key)
|
|
self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
|
|
self.acl_active_table = key
|
|
self.run_verify_negat_test(self.IP, self.IPV4, -1)
|
|
|
|
def test_0030_inoutacl_permit(self):
|
|
""" Input+Output L2 ACL test - permit
|
|
|
|
Test scenario for basic IP ACL with source IP
|
|
- Create IPv4 stream for pg0 -> pg1 interface.
|
|
- Create ACLs with source MAC address.
|
|
- Send and verify received packets on pg1 interface.
|
|
"""
|
|
key = 'mac_inout'
|
|
self.build_classify_table(src_mac='ffffffffffff', key=key)
|
|
self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
|
|
self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
|
|
self.acl_active_table = key
|
|
self.run_verify_test(self.IP, self.IPV4, -1)
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main(testRunner=VppTestRunner)
|