vpp/test/vpp_qos.py
Neale Ranns 83832e7ced qos: Store function
Type: feature

store: write a QoS value into the buffer meta-data
record: Extract a QoS value from a packet header and store it.
mark: Make a change to the content of a packet header by writing a stored
      QoS value

Change-Id: I07d1e87dd1ca90d40ac1ae1774fee1b272cab83f
Signed-off-by: Neale Ranns <nranns@cisco.com>
2019-07-31 16:17:36 +00:00

149 lines
4.2 KiB
Python

"""
QoS
object abstractions for representing QoS config VPP
"""
from vpp_object import VppObject
class VppQosRecord(VppObject):
""" QoS Record(ing) configuration """
def __init__(self, test, intf, source):
self._test = test
self.intf = intf
self.source = source
def add_vpp_config(self):
self._test.vapi.qos_record_enable_disable(
enable=1,
record={'sw_if_index': self.intf.sw_if_index,
'input_source': self.source})
self._test.registry.register(self, self._test.logger)
return self
def remove_vpp_config(self):
self._test.vapi.qos_record_enable_disable(
enable=0,
record={'sw_if_index': self.intf.sw_if_index,
'input_source': self.source})
def query_vpp_config(self):
rs = self._test.vapi.qos_record_dump()
for r in rs:
if self.intf.sw_if_index == r.record.sw_if_index and \
self.source == r.record.input_source:
return True
return False
def object_id(self):
return ("qos-record-%s-%d" % (self.intf, self.source))
class VppQosStore(VppObject):
""" QoS Store(ing) configuration """
def __init__(self, test, intf, source, value):
self._test = test
self.intf = intf
self.source = source
self.value = value
def add_vpp_config(self):
self._test.vapi.qos_store_enable_disable(
enable=1,
store={'sw_if_index': self.intf.sw_if_index,
'input_source': self.source,
'value': self.value})
self._test.registry.register(self, self._test.logger)
return self
def remove_vpp_config(self):
self._test.vapi.qos_store_enable_disable(
enable=0,
store={'sw_if_index': self.intf.sw_if_index,
'input_source': self.source})
def query_vpp_config(self):
rs = self._test.vapi.qos_store_dump()
for r in rs:
if self.intf.sw_if_index == r.store.sw_if_index and \
self.source == r.store.input_source and \
self.value == r.store.value:
return True
return False
def object_id(self):
return ("qos-store-%s-%d" % (self.intf, self.source))
class VppQosEgressMap(VppObject):
""" QoS Egress Map(ping) configuration """
def __init__(self, test, id, rows):
self._test = test
self.id = id
self.rows = rows
def add_vpp_config(self):
self._test.vapi.qos_egress_map_update(
map={'id': self.id,
'rows': self.rows})
self._test.registry.register(self, self._test.logger)
return self
def remove_vpp_config(self):
self._test.vapi.qos_egress_map_delete(id=self.id)
def query_vpp_config(self):
rs = self._test.vapi.qos_egress_map_dump()
for r in rs:
if self.id == r.map.id:
return True
return False
def object_id(self):
return ("qos-map-%d" % (self.id))
class VppQosMark(VppObject):
""" QoS Mark(ing) configuration """
def __init__(self, test, intf, map, source):
self._test = test
self.intf = intf
self.source = source
self.map = map
def add_vpp_config(self):
self._test.vapi.qos_mark_enable_disable(
enable=1,
mark={'sw_if_index': self.intf.sw_if_index,
'map_id': self.map.id,
'output_source': self.source})
self._test.registry.register(self, self._test.logger)
return self
def remove_vpp_config(self):
self._test.vapi.qos_mark_enable_disable(
enable=0,
mark={'sw_if_index': self.intf.sw_if_index,
'output_source': self.source})
def query_vpp_config(self):
ms = self._test.vapi.qos_mark_dump()
for m in ms:
if self.intf.sw_if_index == m.mark.sw_if_index and \
self.source == m.mark.output_source and \
self.map.id == m.mark.map_id:
return True
return False
def object_id(self):
return ("qos-mark-%s-%d" % (self.intf, self.source))