vpp/test/vpp_acl.py
Klement Sekera d9b0c6fbf7 tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is
much faster, stable PEP8 compliant code style checker offering also
automatic formatting. It aims to be very stable and produce smallest
diffs. It's used by many small and big projects.

Running checkstyle with black takes a few seconds with a terse output.
Thus, test-checkstyle-diff is no longer necessary.

Expand scope of checkstyle to all python files in the repo, replacing
test-checkstyle with checkstyle-python.

Also, fixstyle-python is now available for automatic style formatting.

Note: python virtualenv has been consolidated in test/Makefile,
test/requirements*.txt which will eventually be moved to a central
location.  This is required to simply the automated generation of
docker executor images in the CI.

Type: improvement
Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8
Signed-off-by: Klement Sekera <klement.sekera@gmail.com>
Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
2022-05-10 18:52:08 +00:00

514 lines
15 KiB
Python

from ipaddress import IPv4Network
from vpp_object import VppObject
from vpp_papi import VppEnum
from vpp_ip import INVALID_INDEX
from vpp_papi_provider import UnexpectedApiReturnValueError
class VppAclPlugin(VppObject):
def __init__(self, test, enable_intf_counters=False):
self._test = test
self.enable_intf_counters = enable_intf_counters
@property
def enable_intf_counters(self):
return self._enable_intf_counters
@enable_intf_counters.setter
def enable_intf_counters(self, enable):
self.vapi.acl_stats_intf_counters_enable(enable=enable)
def add_vpp_config(self):
pass
def remove_vpp_config(self):
pass
def query_vpp_config(self):
pass
def object_id(self):
return "acl-plugin-%d" % (self._sw_if_index)
class AclRule:
"""ACL Rule"""
# port ranges
PORTS_ALL = -1
PORTS_RANGE = 0
PORTS_RANGE_2 = 1
udp_sport_from = 10
udp_sport_to = udp_sport_from + 5
udp_dport_from = 20000
udp_dport_to = udp_dport_from + 5000
tcp_sport_from = 30
tcp_sport_to = tcp_sport_from + 5
tcp_dport_from = 40000
tcp_dport_to = tcp_dport_from + 5000
udp_sport_from_2 = 90
udp_sport_to_2 = udp_sport_from_2 + 5
udp_dport_from_2 = 30000
udp_dport_to_2 = udp_dport_from_2 + 5000
tcp_sport_from_2 = 130
tcp_sport_to_2 = tcp_sport_from_2 + 5
tcp_dport_from_2 = 20000
tcp_dport_to_2 = tcp_dport_from_2 + 5000
icmp4_type = 8 # echo request
icmp4_code = 3
icmp6_type = 128 # echo request
icmp6_code = 3
icmp4_type_2 = 8
icmp4_code_from_2 = 5
icmp4_code_to_2 = 20
icmp6_type_2 = 128
icmp6_code_from_2 = 8
icmp6_code_to_2 = 42
def __init__(
self,
is_permit,
src_prefix=IPv4Network("0.0.0.0/0"),
dst_prefix=IPv4Network("0.0.0.0/0"),
proto=0,
ports=PORTS_ALL,
sport_from=None,
sport_to=None,
dport_from=None,
dport_to=None,
):
self.is_permit = is_permit
self.src_prefix = src_prefix
self.dst_prefix = dst_prefix
self._proto = proto
self._ports = ports
# assign ports by range
self.update_ports()
# assign specified ports
if sport_from:
self.sport_from = sport_from
if sport_to:
self.sport_to = sport_to
if dport_from:
self.dport_from = dport_from
if dport_to:
self.dport_to = dport_to
def __copy__(self):
new_rule = AclRule(
self.is_permit,
self.src_prefix,
self.dst_prefix,
self._proto,
self._ports,
self.sport_from,
self.sport_to,
self.dport_from,
self.dport_to,
)
return new_rule
def update_ports(self):
if self._ports == self.PORTS_ALL:
self.sport_from = 0
self.dport_from = 0
self.sport_to = 65535
if self._proto == 1 or self._proto == 58:
self.sport_to = 255
self.dport_to = self.sport_to
elif self._ports == self.PORTS_RANGE:
if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP:
self.sport_from = self.icmp4_type
self.sport_to = self.icmp4_type
self.dport_from = self.icmp4_code
self.dport_to = self.icmp4_code
elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6:
self.sport_from = self.icmp6_type
self.sport_to = self.icmp6_type
self.dport_from = self.icmp6_code
self.dport_to = self.icmp6_code
elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP:
self.sport_from = self.tcp_sport_from
self.sport_to = self.tcp_sport_to
self.dport_from = self.tcp_dport_from
self.dport_to = self.tcp_dport_to
elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP:
self.sport_from = self.udp_sport_from
self.sport_to = self.udp_sport_to
self.dport_from = self.udp_dport_from
self.dport_to = self.udp_dport_to
elif self._ports == self.PORTS_RANGE_2:
if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP:
self.sport_from = self.icmp4_type_2
self.sport_to = self.icmp4_type_2
self.dport_from = self.icmp4_code_from_2
self.dport_to = self.icmp4_code_to_2
elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6:
self.sport_from = self.icmp6_type_2
self.sport_to = self.icmp6_type_2
self.dport_from = self.icmp6_code_from_2
self.dport_to = self.icmp6_code_to_2
elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP:
self.sport_from = self.tcp_sport_from_2
self.sport_to = self.tcp_sport_to_2
self.dport_from = self.tcp_dport_from_2
self.dport_to = self.tcp_dport_to_2
elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP:
self.sport_from = self.udp_sport_from_2
self.sport_to = self.udp_sport_to_2
self.dport_from = self.udp_dport_from_2
self.dport_to = self.udp_dport_to_2
else:
self.sport_from = self._ports
self.sport_to = self._ports
self.dport_from = self._ports
self.dport_to = self._ports
@property
def proto(self):
return self._proto
@proto.setter
def proto(self, proto):
self._proto = proto
self.update_ports()
@property
def ports(self):
return self._ports
@ports.setter
def ports(self, ports):
self._ports = ports
self.update_ports()
def encode(self):
return {
"is_permit": self.is_permit,
"proto": self.proto,
"srcport_or_icmptype_first": self.sport_from,
"srcport_or_icmptype_last": self.sport_to,
"src_prefix": self.src_prefix,
"dstport_or_icmpcode_first": self.dport_from,
"dstport_or_icmpcode_last": self.dport_to,
"dst_prefix": self.dst_prefix,
}
class VppAcl(VppObject):
"""VPP ACL"""
def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None):
self._test = test
self._acl_index = acl_index
self.tag = tag
self._rules = rules
@property
def rules(self):
return self._rules
@property
def acl_index(self):
return self._acl_index
@property
def count(self):
return len(self._rules)
def encode_rules(self):
rules = []
for rule in self._rules:
rules.append(rule.encode())
return rules
def add_vpp_config(self, expect_error=False):
try:
reply = self._test.vapi.acl_add_replace(
acl_index=self._acl_index,
tag=self.tag,
count=self.count,
r=self.encode_rules(),
)
self._acl_index = reply.acl_index
self._test.registry.register(self, self._test.logger)
if expect_error:
self._test.fail("Unexpected api reply")
return self
except UnexpectedApiReturnValueError:
if not expect_error:
self._test.fail("Unexpected api reply")
return None
def modify_vpp_config(self, rules):
self._rules = rules
self.add_vpp_config()
def remove_vpp_config(self, expect_error=False):
try:
self._test.vapi.acl_del(acl_index=self._acl_index)
if expect_error:
self._test.fail("Unexpected api reply")
except UnexpectedApiReturnValueError:
if not expect_error:
self._test.fail("Unexpected api reply")
def dump(self):
return self._test.vapi.acl_dump(acl_index=self._acl_index)
def query_vpp_config(self):
dump = self.dump()
for rule in dump:
if rule.acl_index == self._acl_index:
return True
return False
def object_id(self):
return "acl-%s-%d" % (self.tag, self._acl_index)
class VppEtypeWhitelist(VppObject):
"""VPP Etype Whitelist"""
def __init__(self, test, sw_if_index, whitelist, n_input=0):
self._test = test
self.whitelist = whitelist
self.n_input = n_input
self._sw_if_index = sw_if_index
@property
def sw_if_index(self):
return self._sw_if_index
@property
def count(self):
return len(self.whitelist)
def add_vpp_config(self):
self._test.vapi.acl_interface_set_etype_whitelist(
sw_if_index=self._sw_if_index,
count=self.count,
n_input=self.n_input,
whitelist=self.whitelist,
)
self._test.registry.register(self, self._test.logger)
return self
def remove_vpp_config(self):
self._test.vapi.acl_interface_set_etype_whitelist(
sw_if_index=self._sw_if_index, count=0, n_input=0, whitelist=[]
)
def query_vpp_config(self):
self._test.vapi.acl_interface_etype_whitelist_dump(
sw_if_index=self._sw_if_index
)
return False
def object_id(self):
return "acl-etype_wl-%d" % (self._sw_if_index)
class VppAclInterface(VppObject):
"""VPP ACL Interface"""
def __init__(self, test, sw_if_index, acls, n_input=0):
self._test = test
self._sw_if_index = sw_if_index
self.n_input = n_input
self.acls = acls
@property
def sw_if_index(self):
return self._sw_if_index
@property
def count(self):
return len(self.acls)
def encode_acls(self):
acls = []
for acl in self.acls:
acls.append(acl.acl_index)
return acls
def add_vpp_config(self, expect_error=False):
try:
reply = self._test.vapi.acl_interface_set_acl_list(
sw_if_index=self._sw_if_index,
n_input=self.n_input,
count=self.count,
acls=self.encode_acls(),
)
self._test.registry.register(self, self._test.logger)
if expect_error:
self._test.fail("Unexpected api reply")
return self
except UnexpectedApiReturnValueError:
if not expect_error:
self._test.fail("Unexpected api reply")
return None
def remove_vpp_config(self, expect_error=False):
try:
reply = self._test.vapi.acl_interface_set_acl_list(
sw_if_index=self._sw_if_index, n_input=0, count=0, acls=[]
)
if expect_error:
self._test.fail("Unexpected api reply")
except UnexpectedApiReturnValueError:
if not expect_error:
self._test.fail("Unexpected api reply")
def query_vpp_config(self):
dump = self._test.vapi.acl_interface_list_dump(sw_if_index=self._sw_if_index)
for acl_list in dump:
if acl_list.count > 0:
return True
return False
def object_id(self):
return "acl-if-list-%d" % (self._sw_if_index)
class MacipRule:
"""Mac Ip rule"""
def __init__(
self, is_permit, src_mac=0, src_mac_mask=0, src_prefix=IPv4Network("0.0.0.0/0")
):
self.is_permit = is_permit
self.src_mac = src_mac
self.src_mac_mask = src_mac_mask
self.src_prefix = src_prefix
def encode(self):
return {
"is_permit": self.is_permit,
"src_mac": self.src_mac,
"src_mac_mask": self.src_mac_mask,
"src_prefix": self.src_prefix,
}
class VppMacipAcl(VppObject):
"""Vpp Mac Ip ACL"""
def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None):
self._test = test
self._acl_index = acl_index
self.tag = tag
self._rules = rules
@property
def acl_index(self):
return self._acl_index
@property
def rules(self):
return self._rules
@property
def count(self):
return len(self._rules)
def encode_rules(self):
rules = []
for rule in self._rules:
rules.append(rule.encode())
return rules
def add_vpp_config(self, expect_error=False):
try:
reply = self._test.vapi.macip_acl_add_replace(
acl_index=self._acl_index,
tag=self.tag,
count=self.count,
r=self.encode_rules(),
)
self._acl_index = reply.acl_index
self._test.registry.register(self, self._test.logger)
if expect_error:
self._test.fail("Unexpected api reply")
return self
except UnexpectedApiReturnValueError:
if not expect_error:
self._test.fail("Unexpected api reply")
return None
def modify_vpp_config(self, rules):
self._rules = rules
self.add_vpp_config()
def remove_vpp_config(self, expect_error=False):
try:
self._test.vapi.macip_acl_del(acl_index=self._acl_index)
if expect_error:
self._test.fail("Unexpected api reply")
except UnexpectedApiReturnValueError:
if not expect_error:
self._test.fail("Unexpected api reply")
def dump(self):
return self._test.vapi.macip_acl_dump(acl_index=self._acl_index)
def query_vpp_config(self):
dump = self.dump()
for rule in dump:
if rule.acl_index == self._acl_index:
return True
return False
def object_id(self):
return "macip-acl-%s-%d" % (self.tag, self._acl_index)
class VppMacipAclInterface(VppObject):
"""VPP Mac Ip ACL Interface"""
def __init__(self, test, sw_if_index, acls):
self._test = test
self._sw_if_index = sw_if_index
self.acls = acls
@property
def sw_if_index(self):
return self._sw_if_index
@property
def count(self):
return len(self.acls)
def add_vpp_config(self):
for acl in self.acls:
self._test.vapi.macip_acl_interface_add_del(
is_add=True, sw_if_index=self._sw_if_index, acl_index=acl.acl_index
)
self._test.registry.register(self, self._test.logger)
def remove_vpp_config(self):
for acl in self.acls:
self._test.vapi.macip_acl_interface_add_del(
is_add=False, sw_if_index=self._sw_if_index, acl_index=acl.acl_index
)
def dump(self):
return self._test.vapi.macip_acl_interface_list_dump(
sw_if_index=self._sw_if_index
)
def query_vpp_config(self):
dump = self.dump()
for acl_list in dump:
for acl_index in acl_list.acls:
if acl_index != INVALID_INDEX:
return True
return False
def object_id(self):
return "macip-acl-if-list-%d" % (self._sw_if_index)