make test: improve documentation and PEP8 compliance

Change-Id: Ib4f0353aab6112fcc3c3d8f0bcbed5bc4b567b9b
Signed-off-by: Klement Sekera <ksekera@cisco.com>
This commit is contained in:
Klement Sekera
2017-01-04 12:58:53 +01:00
committed by Damjan Marion
parent a48ad28256
commit da505f608e
19 changed files with 468 additions and 156 deletions
+2 -2
View File
@@ -17,7 +17,7 @@ PAPI_INSTALL_DONE=$(VPP_PYTHON_PREFIX)/papi-install.done
PAPI_INSTALL_FLAGS=$(PIP_INSTALL_DONE) $(PIP_PATCH_DONE) $(PAPI_INSTALL_DONE)
$(PIP_INSTALL_DONE):
@virtualenv $(PYTHON_VENV_PATH)
@virtualenv $(PYTHON_VENV_PATH) -p python2.7
@bash -c "source $(PYTHON_VENV_PATH)/bin/activate && pip install $(PYTHON_DEPENDS)"
@touch $@
@@ -54,7 +54,7 @@ wipe: reset
@rm -f $(PAPI_INSTALL_FLAGS)
doc: verify-python-path
@virtualenv $(PYTHON_VENV_PATH)
@virtualenv $(PYTHON_VENV_PATH) -p python2.7
@bash -c "source $(PYTHON_VENV_PATH)/bin/activate && pip install $(PYTHON_DEPENDS) sphinx"
@bash -c "source $(PYTHON_VENV_PATH)/bin/activate && make -C doc WS_ROOT=$(WS_ROOT) BR=$(BR) NO_VPP_PAPI=1 html"
+276 -9
View File
File diff suppressed because it is too large Load Diff
+12 -11
View File
@@ -117,7 +117,8 @@ class VppTestCase(unittest.TestCase):
debug_cli = ""
if cls.step or cls.debug_gdb or cls.debug_gdbserver:
debug_cli = "cli-listen localhost:5002"
cls.vpp_cmdline = [cls.vpp_bin, "unix", "{", "nodaemon", debug_cli, "}",
cls.vpp_cmdline = [cls.vpp_bin,
"unix", "{", "nodaemon", debug_cli, "}",
"api-segment", "{", "prefix", cls.shm_prefix, "}"]
if cls.plugin_path is not None:
cls.vpp_cmdline.extend(["plugin_path", cls.plugin_path])
@@ -246,8 +247,8 @@ class VppTestCase(unittest.TestCase):
print(double_line_delim)
print("VPP or GDB server is still running")
print(single_line_delim)
raw_input("When done debugging, press ENTER to kill the process"
" and finish running the testcase...")
raw_input("When done debugging, press ENTER to kill the "
"process and finish running the testcase...")
if hasattr(cls, 'vpp'):
if hasattr(cls, 'vapi'):
@@ -563,10 +564,10 @@ class VppTestResult(unittest.TestResult):
def __init__(self, stream, descriptions, verbosity):
"""
:param stream File descriptor to store where to report test results. Set
to the standard error stream by default.
:param descriptions Boolean variable to store information if to use test
case descriptions.
:param stream File descriptor to store where to report test results.
Set to the standard error stream by default.
:param descriptions Boolean variable to store information if to use
test case descriptions.
:param verbosity Integer variable to store required verbosity level.
"""
unittest.TestResult.__init__(self, stream, descriptions, verbosity)
@@ -664,12 +665,12 @@ class VppTestResult(unittest.TestResult):
unittest.TestResult.stopTest(self, test)
if self.verbosity > 0:
self.stream.writeln(single_line_delim)
self.stream.writeln("%-60s%s" %
(self.getDescription(test), self.result_string))
self.stream.writeln("%-60s%s" % (self.getDescription(test),
self.result_string))
self.stream.writeln(single_line_delim)
else:
self.stream.writeln("%-60s%s" %
(self.getDescription(test), self.result_string))
self.stream.writeln("%-60s%s" % (self.getDescription(test),
self.result_string))
def printErrors(self):
"""
+2 -2
View File
@@ -105,8 +105,8 @@ class PollHook(Hook):
s = signaldict[abs(self.testcase.vpp.returncode)]
else:
s = "unknown"
msg = "VPP subprocess died unexpectedly with returncode %d [%s]" % (
self.testcase.vpp.returncode, s)
msg = "VPP subprocess died unexpectedly with returncode %d [%s]" %\
(self.testcase.vpp.returncode, s)
self.logger.critical(msg)
core_path = self.testcase.tempdir + '/core'
if os.path.isfile(core_path):
+2 -2
View File
@@ -75,8 +75,8 @@ class BridgeDomain(object):
self.pg_start()
# Pick first received frame and check if it's the
# non-encapsulated frame
# Pick first received frame and check if it's the non-encapsulated
# frame
out = self.pg1.get_capture(1)
pkt = out[0]
self.assert_eq_pkts(pkt, self.frame_request)
+9 -5
View File
@@ -114,8 +114,9 @@ class TestClassifier(VppTestCase):
payload_info = self.payload_to_info(str(packet[Raw]))
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
(dst_if.name, payload_info.src, packet_index))
self.logger.debug(
"Got packet on port %s: src=%u (id=%u)" %
(dst_if.name, payload_info.src, packet_index))
next_info = self.get_next_packet_info_for_interface2(
payload_info.src, dst_sw_if_index,
last_info[payload_info.src])
@@ -296,8 +297,9 @@ class TestClassifier(VppTestCase):
pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
self.pg0.add_stream(pkts)
self.create_classify_table(
'mac', self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
self.create_classify_table('mac',
self.build_mac_mask(src_mac='ffffffffffff'),
data_offset=-14)
self.create_classify_session(
self.pg0, self.acl_tbl_idx.get('mac'),
self.build_mac_match(src_mac=self.pg0.remote_mac))
@@ -326,7 +328,9 @@ class TestClassifier(VppTestCase):
pkts = self.create_stream(self.pg0, self.pg3, self.pg_if_packet_sizes)
self.pg0.add_stream(pkts)
self.create_classify_table('pbr', self.build_ip_mask(src_ip='ffffffff'))
self.create_classify_table(
'pbr', self.build_ip_mask(
src_ip='ffffffff'))
pbr_option = 1
self.create_classify_session(
self.pg0, self.acl_tbl_idx.get('pbr'),
+2 -2
View File
@@ -57,8 +57,8 @@ class TestFlowperpkt(VppTestCase):
if len(payload) * 2 != len(masked_expected_data):
return False
# iterate over pairs: raw byte from payload and ASCII code for that byte
# from masked payload (or XX if masked)
# iterate over pairs: raw byte from payload and ASCII code for that
# byte from masked payload (or XX if masked)
for i in range(len(payload)):
p = payload[i]
m = masked_expected_data[2 * i:2 * i + 2]
+10 -7
View File
@@ -148,8 +148,9 @@ class TestIPv4(VppTestCase):
payload_info = self.payload_to_info(str(packet[Raw]))
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
(dst_if.name, payload_info.src, packet_index))
self.logger.debug(
"Got packet on port %s: src=%u (id=%u)" %
(dst_if.name, payload_info.src, packet_index))
next_info = self.get_next_packet_info_for_interface2(
payload_info.src, dst_sw_if_index,
last_info[payload_info.src])
@@ -209,7 +210,7 @@ class TestIPv4FibCrud(VppTestCase):
- add new 1k,
- del 1.5k
..note:: Python API is to slow to add many routes, needs C code replacement.
..note:: Python API is too slow to add many routes, needs replacement.
"""
def config_fib_many_to_one(self, start_dest_addr, next_hop_addr, count):
@@ -221,8 +222,9 @@ class TestIPv4FibCrud(VppTestCase):
:return list: added ips with 32 prefix
"""
added_ips = []
dest_addr = int(
socket.inet_pton(socket.AF_INET, start_dest_addr).encode('hex'), 16)
dest_addr = int(socket.inet_pton(socket.AF_INET,
start_dest_addr).encode('hex'),
16)
dest_addr_len = 32
n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr)
for _ in range(count):
@@ -236,8 +238,9 @@ class TestIPv4FibCrud(VppTestCase):
def unconfig_fib_many_to_one(self, start_dest_addr, next_hop_addr, count):
removed_ips = []
dest_addr = int(
socket.inet_pton(socket.AF_INET, start_dest_addr).encode('hex'), 16)
dest_addr = int(socket.inet_pton(socket.AF_INET,
start_dest_addr).encode('hex'),
16)
dest_addr_len = 32
n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr)
for _ in range(count):
+6 -5
View File
@@ -95,7 +95,8 @@ class TestIp4VrfMultiInst(VppTestCase):
try:
# Create pg interfaces
cls.create_pg_interfaces(range(cls.nr_of_vrfs * cls.pg_ifs_per_vrf))
cls.create_pg_interfaces(
range(cls.nr_of_vrfs * cls.pg_ifs_per_vrf))
# Packet flows mapping pg0 -> pg1, pg2 etc.
cls.flows = dict()
@@ -308,14 +309,14 @@ class TestIp4VrfMultiInst(VppTestCase):
"""
Create packet streams for all configured l2-pg interfaces, send all
prepared packet streams and verify that:
- all packets received correctly on all pg-l2 interfaces assigned to
bridge domains
- all packets received correctly on all pg-l2 interfaces assigned
to bridge domains
- no packet received on all pg-l2 interfaces not assigned to bridge
domains
:raise RuntimeError: If no packet captured on l2-pg interface assigned
to the bridge domain or if any packet is captured on l2-pg interface
not assigned to the bridge domain.
to the bridge domain or if any packet is captured on l2-pg
interface not assigned to the bridge domain.
"""
# Test
# Create incoming packet streams for packet-generator interfaces
+30 -27
View File
@@ -8,8 +8,8 @@ from vpp_sub_interface import VppSubInterface, VppDot1QSubint
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet6 import IPv6, UDP, ICMPv6ND_NS, ICMPv6ND_RS, ICMPv6ND_RA, \
ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation
from scapy.layers.inet6 import IPv6, UDP, ICMPv6ND_NS, ICMPv6ND_RS, \
ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation
from util import ppp
from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
in6_mactoifaceid
@@ -172,8 +172,9 @@ class TestIPv6(VppTestCase):
payload_info = self.payload_to_info(str(packet[Raw]))
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
(dst_if.name, payload_info.src, packet_index))
self.logger.debug(
"Got packet on port %s: src=%u (id=%u)" %
(dst_if.name, payload_info.src, packet_index))
next_info = self.get_next_packet_info_for_interface2(
payload_info.src, dst_sw_if_index,
last_info[payload_info.src])
@@ -229,9 +230,9 @@ class TestIPv6(VppTestCase):
intf.assert_nothing_captured(remark=remark)
def test_ns(self):
""" IPv6 Neighbour Soliciatation Exceptions
""" IPv6 Neighbour Solicitation Exceptions
Test sceanrio:
Test scenario:
- Send an NS Sourced from an address not covered by the link sub-net
- Send an NS to an mcast address the router has not joined
- Send NS for a target address the router does not onn.
@@ -249,12 +250,13 @@ class TestIPv6(VppTestCase):
ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
pkts = [p]
self.send_and_assert_no_replies(self.pg0, pkts,
"No response to NS source by address not on sub-net")
self.send_and_assert_no_replies(
self.pg0, pkts,
"No response to NS source by address not on sub-net")
#
# An NS for sent to a solicited mcast group the router is not a member of
# FAILS
# An NS for sent to a solicited mcast group the router is
# not a member of FAILS
#
if 0:
nsma = in6_getnsma(inet_pton(socket.AF_INET6, "fd::ffff"))
@@ -266,8 +268,9 @@ class TestIPv6(VppTestCase):
ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
pkts = [p]
self.send_and_assert_no_replies(self.pg0, pkts,
"No response to NS sent to unjoined mcast address")
self.send_and_assert_no_replies(
self.pg0, pkts,
"No response to NS sent to unjoined mcast address")
#
# An NS whose target address is one the router does not own
@@ -307,15 +310,15 @@ class TestIPv6(VppTestCase):
in6_ptop(mk_ll_addr(intf.local_mac)))
def test_rs(self):
""" IPv6 Router Soliciatation Exceptions
""" IPv6 Router Solicitation Exceptions
Test sceanrio:
Test scenario:
"""
#
# Before we begin change the IPv6 RA responses to use the unicast address
# that way we will not confuse them with the periodic Ras which go to the Mcast
# address
# Before we begin change the IPv6 RA responses to use the unicast
# address - that way we will not confuse them with the periodic
# RAs which go to the mcast address
#
self.pg0.ip6_ra_config(send_unicast=1)
@@ -336,8 +339,8 @@ class TestIPv6(VppTestCase):
#
# When we reconfiure the IPv6 RA config, we reset the RA rate limiting,
# so we need to do this before each test below so as not to drop packets for
# rate limiting reasons. Test this works here.
# so we need to do this before each test below so as not to drop
# packets for rate limiting reasons. Test this works here.
#
self.pg0.ip6_ra_config(send_unicast=1)
self.send_and_expect_ra(self.pg0, pkts, "Rate limit reset RS")
@@ -366,12 +369,12 @@ class TestIPv6(VppTestCase):
self.pg0, pkts, "RS sourced from link-local", src_ip=ll)
#
# Source from the unspecified address ::. This happens when the RS is sent before
# the host has a configured address/sub-net, i.e. auto-config.
# Since the sender has no IP address, the reply comes back mcast - so the
# capture needs to not filter this.
# If we happen to pick up the periodic RA at this point then so be it, it's not
# an error.
# Source from the unspecified address ::. This happens when the RS
# is sent before the host has a configured address/sub-net,
# i.e. auto-config. Since the sender has no IP address, the reply
# comes back mcast - so the capture needs to not filter this.
# If we happen to pick up the periodic RA at this point then so be it,
# it's not an error.
#
self.pg0.ip6_ra_config(send_unicast=1)
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
@@ -399,9 +402,9 @@ class TestIPv6(VppTestCase):
@unittest.skip("Unsupported")
def test_mrs(self):
""" IPv6 Multicast Router Soliciatation Exceptions
""" IPv6 Multicast Router Solicitation Exceptions
Test sceanrio:
Test scenario:
"""
#
+6 -5
View File
@@ -106,7 +106,8 @@ class TestL2fib(VppTestCase):
# Create BD with MAC learning and unknown unicast flooding disabled
# and put interfaces to this BD
cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=0, learn=0)
cls.vapi.bridge_domain_add_del(
bd_id=cls.bd_id, uu_flood=0, learn=0)
for pg_if in cls.pg_interfaces:
cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
bd_id=cls.bd_id)
@@ -180,8 +181,8 @@ class TestL2fib(VppTestCase):
end_nr = start + count / n_int
for j in range(start, end_nr):
host = self.hosts_by_pg_idx[pg_if.sw_if_index][j]
self.vapi.l2fib_add_del(host.mac, self.bd_id, pg_if.sw_if_index,
static_mac=1)
self.vapi.l2fib_add_del(
host.mac, self.bd_id, pg_if.sw_if_index, static_mac=1)
counter += 1
percentage = counter / count * 100
if percentage > percent:
@@ -202,8 +203,8 @@ class TestL2fib(VppTestCase):
for pg_if in self.pg_interfaces:
for j in range(count / n_int):
host = self.hosts_by_pg_idx[pg_if.sw_if_index][0]
self.vapi.l2fib_add_del(host.mac, self.bd_id, pg_if.sw_if_index,
is_add=0)
self.vapi.l2fib_add_del(
host.mac, self.bd_id, pg_if.sw_if_index, is_add=0)
self.deleted_hosts_by_pg_idx[pg_if.sw_if_index].append(host)
del self.hosts_by_pg_idx[pg_if.sw_if_index][0]
counter += 1
+7 -6
View File
@@ -95,10 +95,10 @@ class TestL2bdMultiInst(VppTestCase):
for i in range(0, len(cls.pg_interfaces), 3):
cls.flows[cls.pg_interfaces[i]] = [cls.pg_interfaces[i + 1],
cls.pg_interfaces[i + 2]]
cls.flows[cls.pg_interfaces[i + 1]] = [cls.pg_interfaces[i],
cls.pg_interfaces[i + 2]]
cls.flows[cls.pg_interfaces[i + 2]] = [cls.pg_interfaces[i],
cls.pg_interfaces[i + 1]]
cls.flows[cls.pg_interfaces[i + 1]] = \
[cls.pg_interfaces[i], cls.pg_interfaces[i + 2]]
cls.flows[cls.pg_interfaces[i + 2]] = \
[cls.pg_interfaces[i], cls.pg_interfaces[i + 1]]
# Mapping between packet-generator index and lists of test hosts
cls.hosts_by_pg_idx = dict()
@@ -172,8 +172,9 @@ class TestL2bdMultiInst(VppTestCase):
def create_bd_and_mac_learn(self, count, start=1):
"""
Create required number of bridge domains with MAC learning enabled, put
3 l2-pg interfaces to every bridge domain and send MAC learning packets.
Create required number of bridge domains with MAC learning enabled,
put 3 l2-pg interfaces to every bridge domain and send MAC learning
packets.
:param int count: Number of bridge domains to be created.
:param int start: Starting number of the bridge domain ID.
+12 -8
View File
@@ -2,10 +2,10 @@
"""L2XC Multi-instance Test Case HLD:
**NOTES:**
- higher number (more than 15) of pg-l2 interfaces causes problems => only \
14 pg-l2 interfaces and 10 cross-connects are tested
- jumbo packets in configuration with 14 l2-pg interfaces leads to \
problems too
- higher number (more than 15) of pg-l2 interfaces causes problems => only
14 pg-l2 interfaces and 10 cross-connects are tested
- jumbo packets in configuration with 14 l2-pg interfaces leads to
problems too
**config 1**
- add 14 pg-l2 interfaces
@@ -15,7 +15,8 @@
- send L2 MAC frames between all pairs of pg-l2 interfaces
**verify 1**
- all packets received correctly in case of cross-connected l2-pg interfaces
- all packets received correctly in case of cross-connected l2-pg
interfaces
- no packet received in case of not cross-connected l2-pg interfaces
**config 2**
@@ -25,7 +26,8 @@
- send L2 MAC frames between all pairs of pg-l2 interfaces
**verify 2**
- all packets received correctly in case of cross-connected l2-pg interfaces
- all packets received correctly in case of cross-connected l2-pg
interfaces
- no packet received in case of not cross-connected l2-pg interfaces
**config 3**
@@ -35,7 +37,8 @@
- send L2 MAC frames between all pairs of pg-l2 interfaces
**verify 3**
- all packets received correctly in case of cross-connected l2-pg interfaces
- all packets received correctly in case of cross-connected l2-pg
interfaces
- no packet received in case of not cross-connected l2-pg interfaces
**config 4**
@@ -79,7 +82,8 @@ class TestL2xcMultiInst(VppTestCase):
cls.flows = dict()
for i in range(len(cls.pg_interfaces)):
delta = 1 if i % 2 == 0 else -1
cls.flows[cls.pg_interfaces[i]] = [cls.pg_interfaces[i + delta]]
cls.flows[cls.pg_interfaces[i]] =\
[cls.pg_interfaces[i + delta]]
# Mapping between packet-generator index and lists of test hosts
cls.hosts_by_pg_idx = dict()
+15 -5
View File
@@ -130,13 +130,15 @@ class TestSNAT(VppTestCase):
if same_port:
self.assertEqual(packet[TCP].sport, self.tcp_port_in)
else:
self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
self.assertNotEqual(
packet[TCP].sport, self.tcp_port_in)
self.tcp_port_out = packet[TCP].sport
elif packet.haslayer(UDP):
if same_port:
self.assertEqual(packet[UDP].sport, self.udp_port_in)
else:
self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
self.assertNotEqual(
packet[UDP].sport, self.udp_port_in)
self.udp_port_out = packet[UDP].sport
else:
if same_port:
@@ -215,8 +217,14 @@ class TestSNAT(VppTestCase):
addr_only = 0
l_ip = socket.inet_pton(socket.AF_INET, local_ip)
e_ip = socket.inet_pton(socket.AF_INET, external_ip)
self.vapi.snat_add_static_mapping(l_ip, e_ip, local_port, external_port,
addr_only, vrf_id, is_add)
self.vapi.snat_add_static_mapping(
l_ip,
e_ip,
local_port,
external_port,
addr_only,
vrf_id,
is_add)
def snat_add_address(self, ip, is_add=1):
"""
@@ -413,7 +421,9 @@ class TestSNAT(VppTestCase):
self.pg3.assert_nothing_captured()
def test_multiple_inside_interfaces(self):
"""SNAT multiple inside interfaces with non-overlapping address space"""
"""
SNAT multiple inside interfaces with non-overlapping address space
"""
self.snat_add_address(self.snat_addr)
self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+6 -5
View File
@@ -102,11 +102,12 @@ class TestSpan(VppTestCase):
for i in self.interfaces:
last_info[i.sw_if_index] = None
dst_sw_if_index = dst_if.sw_if_index
if len(capture_pg1) != len(capture_pg2):
self.logger.error(
"Different number of outgoing and mirrored packets : %u != %u" %
(len(capture_pg1), len(capture_pg2)))
raise
self.AssertEqual(
len(capture_pg1),
len(capture_pg2),
"Different number of outgoing and mirrored packets : %u != %u" %
(len(capture_pg1),
len(capture_pg2)))
for pkt_pg1, pkt_pg2 in zip(capture_pg1, capture_pg2):
try:
ip1 = pkt_pg1[IP]
+9 -5
View File
@@ -15,7 +15,7 @@ class VppInterface(object):
@property
def remote_mac(self):
"""MAC-address of the remote interface "connected" to this interface."""
"""MAC-address of the remote interface "connected" to this interface"""
return self._remote_hosts[0].mac
@property
@@ -261,19 +261,23 @@ class VppInterface(object):
def admin_up(self):
"""Put interface ADMIN-UP."""
self.test.vapi.sw_interface_set_flags(self.sw_if_index, admin_up_down=1)
self.test.vapi.sw_interface_set_flags(self.sw_if_index,
admin_up_down=1)
def admin_down(self):
"""Put interface ADMIN-down."""
self.test.vapi.sw_interface_set_flags(self.sw_if_index, admin_up_down=0)
self.test.vapi.sw_interface_set_flags(self.sw_if_index,
admin_up_down=0)
def ip6_enable(self):
"""IPv6 Enable interface"""
self.test.vapi.ip6_sw_interface_enable_disable(self.sw_if_index, enable=1)
self.test.vapi.ip6_sw_interface_enable_disable(self.sw_if_index,
enable=1)
def ip6_disable(self):
"""Put interface ADMIN-DOWN."""
self.test.vapi.ip6_sw_interface_enable_disable(self.sw_if_index, enable=0)
self.test.vapi.ip6_sw_interface_enable_disable(self.sw_if_index,
enable=0)
def add_sub_if(self, sub_if):
"""Register a sub-interface with this interface.
+30 -22
View File
@@ -13,7 +13,13 @@ MPLS_LABEL_INVALID = MPLS_IETF_MAX_LABEL + 1
class RoutePath:
def __init__(self, nh_addr, nh_sw_if_index, nh_table_id=0, labels=[], nh_via_label=MPLS_LABEL_INVALID):
def __init__(
self,
nh_addr,
nh_sw_if_index,
nh_table_id=0,
labels=[],
nh_via_label=MPLS_LABEL_INVALID):
self.nh_addr = socket.inet_pton(socket.AF_INET, nh_addr)
self.nh_itf = nh_sw_if_index
self.nh_table_id = nh_table_id
@@ -36,15 +42,16 @@ class IpRoute:
def add_vpp_config(self):
for path in self.paths:
self._test.vapi.ip_add_del_route(self.dest_addr,
self.dest_addr_len,
path.nh_addr,
path.nh_itf,
table_id=self.table_id,
next_hop_out_label_stack=path.nh_labels,
next_hop_n_out_labels=len(
path.nh_labels),
next_hop_via_label=path.nh_via_label)
self._test.vapi.ip_add_del_route(
self.dest_addr,
self.dest_addr_len,
path.nh_addr,
path.nh_itf,
table_id=self.table_id,
next_hop_out_label_stack=path.nh_labels,
next_hop_n_out_labels=len(
path.nh_labels),
next_hop_via_label=path.nh_via_label)
def remove_vpp_config(self):
for path in self.paths:
@@ -61,7 +68,7 @@ class MplsIpBind:
MPLS to IP Binding
"""
def __init__(self, test, local_label, dest_addr, dest_addr_len):
def __init__(self, test, local_label, dest_addr, dest_addr_len):
self._test = test
self.dest_addr = socket.inet_pton(socket.AF_INET, dest_addr)
self.dest_addr_len = dest_addr_len
@@ -93,17 +100,18 @@ class MplsRoute:
def add_vpp_config(self):
for path in self.paths:
self._test.vapi.mpls_route_add_del(self.local_label,
self.eos_bit,
1,
path.nh_addr,
path.nh_itf,
table_id=self.table_id,
next_hop_out_label_stack=path.nh_labels,
next_hop_n_out_labels=len(
path.nh_labels),
next_hop_via_label=path.nh_via_label,
next_hop_table_id=path.nh_table_id)
self._test.vapi.mpls_route_add_del(
self.local_label,
self.eos_bit,
1,
path.nh_addr,
path.nh_itf,
table_id=self.table_id,
next_hop_out_label_stack=path.nh_labels,
next_hop_n_out_labels=len(
path.nh_labels),
next_hop_via_label=path.nh_via_label,
next_hop_table_id=path.nh_table_id)
def remove_vpp_config(self):
for path in self.paths:
+6 -6
View File
@@ -5,9 +5,9 @@ from hook import Hook
from collections import deque
# Sphinx creates auto-generated documentation by importing the python source
# files and collecting the docstrings from them. The NO_VPP_PAPI flag allows the
# vpp_papi_provider.py file to be importable without having to build the whole
# vpp api if the user only wishes to generate the test documentation.
# files and collecting the docstrings from them. The NO_VPP_PAPI flag allows
# the vpp_papi_provider.py file to be importable without having to build
# the whole vpp api if the user only wishes to generate the test documentation.
do_import = True
try:
no_vpp_papi = os.getenv("NO_VPP_PAPI")
@@ -224,8 +224,8 @@ class VppPapiProvider(object):
send_unicast,):
return self.api(self.papi.sw_interface_ip6nd_ra_config,
{'sw_if_index': sw_if_index,
'suppress' : suppress,
'send_unicast' : send_unicast})
'suppress': suppress,
'send_unicast': send_unicast})
def ip6_sw_interface_enable_disable(self, sw_if_index, enable):
"""
@@ -972,7 +972,7 @@ class VppPapiProvider(object):
"""
:param is_add:
:param mask:
:param match_n_vectors: (Default value = 1):
:param match_n_vectors: (Default value = 1)
:param table_index: (Default value = 0xFFFFFFFF)
:param nbuckets: (Default value = 2)
:param memory_size: (Default value = 2097152)
+26 -22
View File
@@ -13,6 +13,7 @@ from util import ppp, ppc
from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ismaddr
from scapy.utils import inet_pton, inet_ntop
def is_ipv6_misc(p):
""" Is packet one of uninteresting IPv6 broadcasts? """
if p.haslayer(ICMPv6ND_RA):
@@ -91,10 +92,12 @@ class VppPGInterface(VppInterface):
self._capture_cli = "packet-generator capture pg%u pcap %s" % (
self.pg_index, self.out_path)
self._cap_name = "pcap%u" % self.sw_if_index
self._input_cli = "packet-generator new pcap %s source pg%u name %s" % (
self.in_path, self.pg_index, self.cap_name)
self._input_cli = \
"packet-generator new pcap %s source pg%u name %s" % (
self.in_path, self.pg_index, self.cap_name)
def rotate_out_file(self):
def enable_capture(self):
""" Enable capture on this packet-generator interface"""
try:
if os.path.isfile(self.out_path):
os.rename(self.out_path,
@@ -106,10 +109,6 @@ class VppPGInterface(VppInterface):
self._out_file))
except:
pass
def enable_capture(self):
""" Enable capture on this packet-generator interface"""
self.rotate_out_file()
# FIXME this should be an API, but no such exists atm
self.test.vapi.cli(self.capture_cli)
self._pcap_reader = None
@@ -151,7 +150,7 @@ class VppPGInterface(VppInterface):
before = len(output.res)
if filter_out_fn:
output.res = [p for p in output.res if not filter_out_fn(p)]
removed = before - len(output.res)
removed = len(output.res) - before
if removed:
self.test.logger.debug(
"Filtered out %s packets from capture (returning %s)" %
@@ -181,9 +180,13 @@ class VppPGInterface(VppInterface):
based_on = "based on stored packet_infos"
if expected_count == 0:
raise Exception(
"Internal error, expected packet count for %s is 0!" % name)
"Internal error, expected packet count for %s is 0!" %
name)
self.test.logger.debug("Expecting to capture %s (%s) packets on %s" % (
expected_count, based_on, name))
if expected_count == 0:
raise Exception(
"Internal error, expected packet count for %s is 0!" % name)
while remaining_time > 0:
before = time.time()
capture = self._get_capture(remaining_time, filter_out_fn)
@@ -192,8 +195,6 @@ class VppPGInterface(VppInterface):
if len(capture.res) == expected_count:
# bingo, got the packets we expected
return capture
elif expected_count == 0:
return None
remaining_time -= elapsed_time
if capture:
raise Exception("Captured packets mismatch, captured %s packets, "
@@ -241,8 +242,9 @@ class VppPGInterface(VppInterface):
self.test.logger.debug("Waiting for capture file %s to appear, "
"timeout is %ss" % (self.out_path, timeout))
else:
self.test.logger.debug("Capture file %s already exists" %
self.out_path)
self.test.logger.debug(
"Capture file %s already exists" %
self.out_path)
return True
while time.time() < limit:
if os.path.isfile(self.out_path):
@@ -275,8 +277,10 @@ class VppPGInterface(VppInterface):
self._pcap_reader = PcapReader(self.out_path)
break
except:
self.test.logger.debug("Exception in scapy.PcapReader(%s): "
"%s" % (self.out_path, format_exc()))
self.test.logger.debug(
"Exception in scapy.PcapReader(%s): "
"%s" %
(self.out_path, format_exc()))
if not self._pcap_reader:
raise Exception("Capture file %s did not appear within "
"timeout" % self.out_path)
@@ -290,8 +294,9 @@ class VppPGInterface(VppInterface):
"Packet received after %ss was filtered out" %
(time.time() - (deadline - timeout)))
else:
self.test.logger.debug("Packet received after %fs" %
(time.time() - (deadline - timeout)))
self.test.logger.debug(
"Packet received after %fs" %
(time.time() - (deadline - timeout)))
return p
time.sleep(0) # yield
self.test.logger.debug("Timeout - no packets received")
@@ -335,7 +340,6 @@ class VppPGInterface(VppInterface):
self.test.logger.info("No ARP received on port %s" %
pg_interface.name)
return
self.rotate_out_file()
arp_reply = captured_packet.copy() # keep original for exception
# Make Dot1AD packet content recognizable to scapy
if arp_reply.type == 0x88a8:
@@ -380,7 +384,8 @@ class VppPGInterface(VppInterface):
captured_packet = pg_interface.wait_for_packet(
deadline - now, filter_out_fn=None)
except:
self.test.logger.error("Timeout while waiting for NDP response")
self.test.logger.error(
"Timeout while waiting for NDP response")
raise
ndp_reply = captured_packet.copy() # keep original for exception
# Make Dot1AD packet content recognizable to scapy
@@ -395,13 +400,12 @@ class VppPGInterface(VppInterface):
self._local_mac = opt.lladdr
self.test.logger.debug(self.test.vapi.cli("show trace"))
# we now have the MAC we've been after
self.rotate_out_file()
return
except:
self.test.logger.info(
ppp("Unexpected response to NDP request:", captured_packet))
ppp("Unexpected response to NDP request:",
captured_packet))
now = time.time()
self.test.logger.debug(self.test.vapi.cli("show trace"))
self.rotate_out_file()
raise Exception("Timeout while waiting for NDP response")