vpp/test/vpp_srv6.py
Klement Sekera d9b0c6fbf7 tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is
much faster, stable PEP8 compliant code style checker offering also
automatic formatting. It aims to be very stable and produce smallest
diffs. It's used by many small and big projects.

Running checkstyle with black takes a few seconds with a terse output.
Thus, test-checkstyle-diff is no longer necessary.

Expand scope of checkstyle to all python files in the repo, replacing
test-checkstyle with checkstyle-python.

Also, fixstyle-python is now available for automatic style formatting.

Note: python virtualenv has been consolidated in test/Makefile,
test/requirements*.txt which will eventually be moved to a central
location.  This is required to simply the automated generation of
docker executor images in the CI.

Type: improvement
Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8
Signed-off-by: Klement Sekera <klement.sekera@gmail.com>
Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
2022-05-10 18:52:08 +00:00

214 lines
5.5 KiB
Python

"""
SRv6 LocalSIDs
object abstractions for representing SRv6 localSIDs in VPP
"""
from vpp_object import VppObject
from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
class SRv6LocalSIDBehaviors:
# from src/vnet/srv6/sr.h
SR_BEHAVIOR_END = 1
SR_BEHAVIOR_X = 2
SR_BEHAVIOR_T = 3
SR_BEHAVIOR_D_FIRST = 4 # Unused. Separator in between regular and D
SR_BEHAVIOR_DX2 = 5
SR_BEHAVIOR_DX6 = 6
SR_BEHAVIOR_DX4 = 7
SR_BEHAVIOR_DT6 = 8
SR_BEHAVIOR_DT4 = 9
SR_BEHAVIOR_END_UN_PERF = 10
SR_BEHAVIOR_END_UN = 11
SR_BEHAVIOR_LAST = 12 # Must always be the last one
class SRv6PolicyType:
# from src/vnet/srv6/sr.h
SR_POLICY_TYPE_DEFAULT = 0
SR_POLICY_TYPE_SPRAY = 1
class SRv6PolicySteeringTypes:
# from src/vnet/srv6/sr.h
SR_STEER_L2 = 2
SR_STEER_IPV4 = 4
SR_STEER_IPV6 = 6
class VppSRv6LocalSID(VppObject):
"""
SRv6 LocalSID
"""
def __init__(
self,
test,
localsid,
behavior,
nh_addr,
end_psp,
sw_if_index,
vlan_index,
fib_table,
):
self._test = test
self.localsid = localsid
self.behavior = behavior
self.nh_addr = nh_addr
self.end_psp = end_psp
self.sw_if_index = sw_if_index
self.vlan_index = vlan_index
self.fib_table = fib_table
self._configured = False
def add_vpp_config(self):
self._test.vapi.sr_localsid_add_del(
localsid=self.localsid,
behavior=self.behavior,
nh_addr=self.nh_addr,
is_del=0,
end_psp=self.end_psp,
sw_if_index=self.sw_if_index,
vlan_index=self.vlan_index,
fib_table=self.fib_table,
)
self._configured = True
def remove_vpp_config(self):
self._test.vapi.sr_localsid_add_del(
localsid=self.localsid,
behavior=self.behavior,
nh_addr=self.nh_addr,
is_del=1,
end_psp=self.end_psp,
sw_if_index=self.sw_if_index,
vlan_index=self.vlan_index,
fib_table=self.fib_table,
)
self._configured = False
def query_vpp_config(self):
# sr_localsids_dump API is disabled
# use _configured flag for now
return self._configured
def object_id(self):
return "%d;%s,%d" % (self.fib_table, self.localsid, self.behavior)
class VppSRv6Policy(VppObject):
"""
SRv6 Policy
"""
def __init__(
self, test, bsid, is_encap, sr_type, weight, fib_table, segments, source
):
self._test = test
self.bsid = bsid
self.is_encap = is_encap
self.sr_type = sr_type
self.weight = weight
self.fib_table = fib_table
self.segments = segments
self.n_segments = len(segments)
# source not passed to API
# self.source = inet_pton(AF_INET6, source)
self.source = source
self._configured = False
def add_vpp_config(self):
self._test.vapi.sr_policy_add(
bsid=self.bsid,
weight=self.weight,
is_encap=self.is_encap,
is_spray=self.sr_type,
fib_table=self.fib_table,
sids={"num_sids": self.n_segments, "sids": self.segments},
)
self._configured = True
def remove_vpp_config(self):
self._test.vapi.sr_policy_del(self.bsid)
self._configured = False
def query_vpp_config(self):
# no API to query SR Policies
# use _configured flag for now
return self._configured
def object_id(self):
return "%d;%s-><%s>;%d" % (
self.sr_type,
self.bsid,
",".join(self.segments),
self.is_encap,
)
class VppSRv6Steering(VppObject):
"""
SRv6 Steering
"""
def __init__(
self,
test,
bsid,
prefix,
mask_width,
traffic_type,
sr_policy_index,
table_id,
sw_if_index,
):
self._test = test
self.bsid = bsid
self.prefix = prefix
self.mask_width = mask_width
self.traffic_type = traffic_type
self.sr_policy_index = sr_policy_index
self.sw_if_index = sw_if_index
self.table_id = table_id
self._configured = False
def add_vpp_config(self):
self._test.vapi.sr_steering_add_del(
is_del=0,
bsid=self.bsid,
sr_policy_index=self.sr_policy_index,
table_id=self.table_id,
prefix={"address": self.prefix, "len": self.mask_width},
sw_if_index=self.sw_if_index,
traffic_type=self.traffic_type,
)
self._configured = True
def remove_vpp_config(self):
self._test.vapi.sr_steering_add_del(
is_del=1,
bsid=self.bsid,
sr_policy_index=self.sr_policy_index,
table_id=self.table_id,
prefix={"address": self.prefix, "len": self.mask_width},
sw_if_index=self.sw_if_index,
traffic_type=self.traffic_type,
)
self._configured = False
def query_vpp_config(self):
# no API to query steering entries
# use _configured flag for now
return self._configured
def object_id(self):
return "%d;%d;%s/%d->%s" % (
self.table_id,
self.traffic_type,
self.prefix,
self.mask_width,
self.bsid,
)