514 lines
15 KiB
Python
Raw Permalink Normal View History

from ipaddress import IPv4Network
from vpp_object import VppObject
from vpp_papi import VppEnum
from vpp_ip import INVALID_INDEX
from vpp_papi_provider import UnexpectedApiReturnValueError
class VppAclPlugin(VppObject):
def __init__(self, test, enable_intf_counters=False):
self._test = test
self.enable_intf_counters = enable_intf_counters
@property
def enable_intf_counters(self):
return self._enable_intf_counters
@enable_intf_counters.setter
def enable_intf_counters(self, enable):
self.vapi.acl_stats_intf_counters_enable(enable=enable)
def add_vpp_config(self):
pass
def remove_vpp_config(self):
pass
def query_vpp_config(self):
pass
def object_id(self):
return "acl-plugin-%d" % (self._sw_if_index)
class AclRule:
"""ACL Rule"""
# port ranges
PORTS_ALL = -1
PORTS_RANGE = 0
PORTS_RANGE_2 = 1
udp_sport_from = 10
udp_sport_to = udp_sport_from + 5
udp_dport_from = 20000
udp_dport_to = udp_dport_from + 5000
tcp_sport_from = 30
tcp_sport_to = tcp_sport_from + 5
tcp_dport_from = 40000
tcp_dport_to = tcp_dport_from + 5000
udp_sport_from_2 = 90
udp_sport_to_2 = udp_sport_from_2 + 5
udp_dport_from_2 = 30000
udp_dport_to_2 = udp_dport_from_2 + 5000
tcp_sport_from_2 = 130
tcp_sport_to_2 = tcp_sport_from_2 + 5
tcp_dport_from_2 = 20000
tcp_dport_to_2 = tcp_dport_from_2 + 5000
icmp4_type = 8 # echo request
icmp4_code = 3
icmp6_type = 128 # echo request
icmp6_code = 3
icmp4_type_2 = 8
icmp4_code_from_2 = 5
icmp4_code_to_2 = 20
icmp6_type_2 = 128
icmp6_code_from_2 = 8
icmp6_code_to_2 = 42
def __init__(
self,
is_permit,
src_prefix=IPv4Network("0.0.0.0/0"),
dst_prefix=IPv4Network("0.0.0.0/0"),
proto=0,
ports=PORTS_ALL,
sport_from=None,
sport_to=None,
dport_from=None,
dport_to=None,
):
self.is_permit = is_permit
self.src_prefix = src_prefix
self.dst_prefix = dst_prefix
self._proto = proto
self._ports = ports
# assign ports by range
self.update_ports()
# assign specified ports
if sport_from:
self.sport_from = sport_from
if sport_to:
self.sport_to = sport_to
if dport_from:
self.dport_from = dport_from
if dport_to:
self.dport_to = dport_to
def __copy__(self):
new_rule = AclRule(
self.is_permit,
self.src_prefix,
self.dst_prefix,
self._proto,
self._ports,
self.sport_from,
self.sport_to,
self.dport_from,
self.dport_to,
)
return new_rule
def update_ports(self):
if self._ports == self.PORTS_ALL:
self.sport_from = 0
self.dport_from = 0
self.sport_to = 65535
if self._proto == 1 or self._proto == 58:
self.sport_to = 255
self.dport_to = self.sport_to
elif self._ports == self.PORTS_RANGE:
if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP:
self.sport_from = self.icmp4_type
self.sport_to = self.icmp4_type
self.dport_from = self.icmp4_code
self.dport_to = self.icmp4_code
elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6:
self.sport_from = self.icmp6_type
self.sport_to = self.icmp6_type
self.dport_from = self.icmp6_code
self.dport_to = self.icmp6_code
elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP:
self.sport_from = self.tcp_sport_from
self.sport_to = self.tcp_sport_to
self.dport_from = self.tcp_dport_from
self.dport_to = self.tcp_dport_to
elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP:
self.sport_from = self.udp_sport_from
self.sport_to = self.udp_sport_to
self.dport_from = self.udp_dport_from
self.dport_to = self.udp_dport_to
elif self._ports == self.PORTS_RANGE_2:
if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP:
self.sport_from = self.icmp4_type_2
self.sport_to = self.icmp4_type_2
self.dport_from = self.icmp4_code_from_2
self.dport_to = self.icmp4_code_to_2
elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6:
self.sport_from = self.icmp6_type_2
self.sport_to = self.icmp6_type_2
self.dport_from = self.icmp6_code_from_2
self.dport_to = self.icmp6_code_to_2
elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP:
self.sport_from = self.tcp_sport_from_2
self.sport_to = self.tcp_sport_to_2
self.dport_from = self.tcp_dport_from_2
self.dport_to = self.tcp_dport_to_2
elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP:
self.sport_from = self.udp_sport_from_2
self.sport_to = self.udp_sport_to_2
self.dport_from = self.udp_dport_from_2
self.dport_to = self.udp_dport_to_2
else:
self.sport_from = self._ports
self.sport_to = self._ports
self.dport_from = self._ports
self.dport_to = self._ports
@property
def proto(self):
return self._proto
@proto.setter
def proto(self, proto):
self._proto = proto
self.update_ports()
@property
def ports(self):
return self._ports
@ports.setter
def ports(self, ports):
self._ports = ports
self.update_ports()
def encode(self):
return {
"is_permit": self.is_permit,
"proto": self.proto,
"srcport_or_icmptype_first": self.sport_from,
"srcport_or_icmptype_last": self.sport_to,
"src_prefix": self.src_prefix,
"dstport_or_icmpcode_first": self.dport_from,
"dstport_or_icmpcode_last": self.dport_to,
"dst_prefix": self.dst_prefix,
}
class VppAcl(VppObject):
"""VPP ACL"""
def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None):
self._test = test
self._acl_index = acl_index
self.tag = tag
self._rules = rules
@property
def rules(self):
return self._rules
@property
def acl_index(self):
return self._acl_index
@property
def count(self):
return len(self._rules)
def encode_rules(self):
rules = []
for rule in self._rules:
rules.append(rule.encode())
return rules
def add_vpp_config(self, expect_error=False):
try:
reply = self._test.vapi.acl_add_replace(
acl_index=self._acl_index,
tag=self.tag,
count=self.count,
r=self.encode_rules(),
)
self._acl_index = reply.acl_index
self._test.registry.register(self, self._test.logger)
if expect_error:
self._test.fail("Unexpected api reply")
return self
except UnexpectedApiReturnValueError:
if not expect_error:
self._test.fail("Unexpected api reply")
return None
def modify_vpp_config(self, rules):
self._rules = rules
self.add_vpp_config()
def remove_vpp_config(self, expect_error=False):
try:
self._test.vapi.acl_del(acl_index=self._acl_index)
if expect_error:
self._test.fail("Unexpected api reply")
except UnexpectedApiReturnValueError:
if not expect_error:
self._test.fail("Unexpected api reply")
def dump(self):
return self._test.vapi.acl_dump(acl_index=self._acl_index)
def query_vpp_config(self):
dump = self.dump()
for rule in dump:
if rule.acl_index == self._acl_index:
return True
return False
def object_id(self):
return "acl-%s-%d" % (self.tag, self._acl_index)
class VppEtypeWhitelist(VppObject):
"""VPP Etype Whitelist"""
def __init__(self, test, sw_if_index, whitelist, n_input=0):
self._test = test
self.whitelist = whitelist
self.n_input = n_input
self._sw_if_index = sw_if_index
@property
def sw_if_index(self):
return self._sw_if_index
@property
def count(self):
return len(self.whitelist)
def add_vpp_config(self):
self._test.vapi.acl_interface_set_etype_whitelist(
sw_if_index=self._sw_if_index,
count=self.count,
n_input=self.n_input,
whitelist=self.whitelist,
)
self._test.registry.register(self, self._test.logger)
return self
def remove_vpp_config(self):
self._test.vapi.acl_interface_set_etype_whitelist(
sw_if_index=self._sw_if_index, count=0, n_input=0, whitelist=[]
)
def query_vpp_config(self):
self._test.vapi.acl_interface_etype_whitelist_dump(
sw_if_index=self._sw_if_index
)
return False
def object_id(self):
return "acl-etype_wl-%d" % (self._sw_if_index)
class VppAclInterface(VppObject):
"""VPP ACL Interface"""
def __init__(self, test, sw_if_index, acls, n_input=0):
self._test = test
self._sw_if_index = sw_if_index
self.n_input = n_input
self.acls = acls
@property
def sw_if_index(self):
return self._sw_if_index
@property
def count(self):
return len(self.acls)
def encode_acls(self):
acls = []
for acl in self.acls:
acls.append(acl.acl_index)
return acls
def add_vpp_config(self, expect_error=False):
try:
reply = self._test.vapi.acl_interface_set_acl_list(
sw_if_index=self._sw_if_index,
n_input=self.n_input,
count=self.count,
acls=self.encode_acls(),
)
self._test.registry.register(self, self._test.logger)
if expect_error:
self._test.fail("Unexpected api reply")
return self
except UnexpectedApiReturnValueError:
if not expect_error:
self._test.fail("Unexpected api reply")
return None
def remove_vpp_config(self, expect_error=False):
try:
reply = self._test.vapi.acl_interface_set_acl_list(
sw_if_index=self._sw_if_index, n_input=0, count=0, acls=[]
)
if expect_error:
self._test.fail("Unexpected api reply")
except UnexpectedApiReturnValueError:
if not expect_error:
self._test.fail("Unexpected api reply")
def query_vpp_config(self):
dump = self._test.vapi.acl_interface_list_dump(sw_if_index=self._sw_if_index)
for acl_list in dump:
if acl_list.count > 0:
return True
return False
def object_id(self):
return "acl-if-list-%d" % (self._sw_if_index)
class MacipRule:
"""Mac Ip rule"""
def __init__(
self, is_permit, src_mac=0, src_mac_mask=0, src_prefix=IPv4Network("0.0.0.0/0")
):
self.is_permit = is_permit
self.src_mac = src_mac
self.src_mac_mask = src_mac_mask
self.src_prefix = src_prefix
def encode(self):
return {
"is_permit": self.is_permit,
"src_mac": self.src_mac,
"src_mac_mask": self.src_mac_mask,
"src_prefix": self.src_prefix,
}
class VppMacipAcl(VppObject):
"""Vpp Mac Ip ACL"""
def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None):
self._test = test
self._acl_index = acl_index
self.tag = tag
self._rules = rules
@property
def acl_index(self):
return self._acl_index
@property
def rules(self):
return self._rules
@property
def count(self):
return len(self._rules)
def encode_rules(self):
rules = []
for rule in self._rules:
rules.append(rule.encode())
return rules
def add_vpp_config(self, expect_error=False):
try:
reply = self._test.vapi.macip_acl_add_replace(
acl_index=self._acl_index,
tag=self.tag,
count=self.count,
r=self.encode_rules(),
)
self._acl_index = reply.acl_index
self._test.registry.register(self, self._test.logger)
if expect_error:
self._test.fail("Unexpected api reply")
return self
except UnexpectedApiReturnValueError:
if not expect_error:
self._test.fail("Unexpected api reply")
return None
def modify_vpp_config(self, rules):
self._rules = rules
self.add_vpp_config()
def remove_vpp_config(self, expect_error=False):
try:
self._test.vapi.macip_acl_del(acl_index=self._acl_index)
if expect_error:
self._test.fail("Unexpected api reply")
except UnexpectedApiReturnValueError:
if not expect_error:
self._test.fail("Unexpected api reply")
def dump(self):
return self._test.vapi.macip_acl_dump(acl_index=self._acl_index)
def query_vpp_config(self):
dump = self.dump()
for rule in dump:
if rule.acl_index == self._acl_index:
return True
return False
def object_id(self):
return "macip-acl-%s-%d" % (self.tag, self._acl_index)
class VppMacipAclInterface(VppObject):
"""VPP Mac Ip ACL Interface"""
def __init__(self, test, sw_if_index, acls):
self._test = test
self._sw_if_index = sw_if_index
self.acls = acls
@property
def sw_if_index(self):
return self._sw_if_index
@property
def count(self):
return len(self.acls)
def add_vpp_config(self):
for acl in self.acls:
self._test.vapi.macip_acl_interface_add_del(
is_add=True, sw_if_index=self._sw_if_index, acl_index=acl.acl_index
)
self._test.registry.register(self, self._test.logger)
def remove_vpp_config(self):
for acl in self.acls:
self._test.vapi.macip_acl_interface_add_del(
is_add=False, sw_if_index=self._sw_if_index, acl_index=acl.acl_index
)
def dump(self):
return self._test.vapi.macip_acl_interface_list_dump(
sw_if_index=self._sw_if_index
)
def query_vpp_config(self):
dump = self.dump()
for acl_list in dump:
for acl_index in acl_list.acls:
if acl_index != INVALID_INDEX:
return True
return False
def object_id(self):
return "macip-acl-if-list-%d" % (self._sw_if_index)