Tests to target holes in adjacency and DPO test coverage

Change-Id: Ic6ac7e441a7b75baa02f03c1585d1ae00903a399
Signed-off-by: Neale Ranns <nranns@cisco.com>
This commit is contained in:
Neale Ranns
2017-02-21 17:30:26 -08:00
parent 794813fda1
commit 37be73693a
7 changed files with 284 additions and 16 deletions

View File

@@ -241,7 +241,9 @@ gre_update_adj (vnet_main_t * vnm,
adj_index_t ai)
{
adj_nbr_midchain_update_rewrite (ai, gre_fixup,
ADJ_MIDCHAIN_FLAG_NONE,
(VNET_LINK_ETHERNET == adj_get_link_type (ai) ?
ADJ_MIDCHAIN_FLAG_NO_COUNT :
ADJ_MIDCHAIN_FLAG_NONE),
gre_build_rewrite(vnm, sw_if_index,
adj_get_link_type(ai),
NULL));

View File

@@ -5,10 +5,11 @@ import unittest
from framework import VppTestCase, VppTestRunner
from vpp_sub_interface import VppSubInterface, VppDot1QSubint, VppDot1ADSubint
from vpp_ip_route import VppIpRoute, VppRoutePath
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet import IP, UDP
from scapy.layers.inet import IP, UDP, ICMP, icmptypes, icmpcodes
from util import ppp
@@ -465,5 +466,85 @@ class TestIPv4FibCrud(VppTestCase):
self.verify_not_in_route_dump(fib_dump, self.deleted_routes)
class TestIPNull(VppTestCase):
""" IPv4 routes via NULL """
def setUp(self):
super(TestIPNull, self).setUp()
# create 2 pg interfaces
self.create_pg_interfaces(range(1))
for i in self.pg_interfaces:
i.admin_up()
i.config_ip4()
i.resolve_arp()
def tearDown(self):
super(TestIPNull, self).tearDown()
for i in self.pg_interfaces:
i.unconfig_ip4()
i.admin_down()
def test_ip_null(self):
""" IP NULL route """
#
# A route via IP NULL that will reply with ICMP unreachables
#
ip_unreach = VppIpRoute(self, "10.0.0.1", 32, [], is_unreach=1)
ip_unreach.add_vpp_config()
p_unreach = (Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst="10.0.0.1") /
UDP(sport=1234, dport=1234) /
Raw('\xa5' * 100))
self.pg0.add_stream(p_unreach)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg0.get_capture(1)
rx = rx[0]
icmp = rx[ICMP]
self.assertEqual(icmptypes[icmp.type], "dest-unreach")
self.assertEqual(icmpcodes[icmp.type][icmp.code], "host-unreachable")
self.assertEqual(icmp.src, self.pg0.remote_ip4)
self.assertEqual(icmp.dst, "10.0.0.1")
#
# ICMP replies are rate limited. so sit and spin.
#
self.sleep(1)
#
# A route via IP NULL that will reply with ICMP prohibited
#
ip_prohibit = VppIpRoute(self, "10.0.0.2", 32, [], is_prohibit=1)
ip_prohibit.add_vpp_config()
p_prohibit = (Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst="10.0.0.2") /
UDP(sport=1234, dport=1234) /
Raw('\xa5' * 100))
self.pg0.add_stream(p_prohibit)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg0.get_capture(1)
rx = rx[0]
icmp = rx[ICMP]
self.assertEqual(icmptypes[icmp.type], "dest-unreach")
self.assertEqual(icmpcodes[icmp.type][icmp.code], "host-prohibited")
self.assertEqual(icmp.src, self.pg0.remote_ip4)
self.assertEqual(icmp.dst, "10.0.0.2")
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)

View File

@@ -7,13 +7,14 @@ from framework import VppTestCase, VppTestRunner
from vpp_sub_interface import VppSubInterface, VppDot1QSubint
from vpp_pg_interface import is_ipv6_misc
from vpp_neighbor import find_nbr
from vpp_ip_route import VppIpRoute, VppRoutePath
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet6 import IPv6, UDP, ICMPv6ND_NS, ICMPv6ND_RS, \
ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \
ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
ICMPv6ND_NA, ICMPv6NDOptDstLLAddr
ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types
from util import ppp
from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
@@ -888,5 +889,73 @@ class IPv6NDProxyTest(TestIPv6ND):
self.assertTrue(rx[0].haslayer(ICMPv6ND_NS))
class TestIPNull(VppTestCase):
""" IPv6 routes via NULL """
def setUp(self):
super(TestIPNull, self).setUp()
# create 2 pg interfaces
self.create_pg_interfaces(range(1))
for i in self.pg_interfaces:
i.admin_up()
i.config_ip6()
i.resolve_ndp()
def tearDown(self):
super(TestIPNull, self).tearDown()
for i in self.pg_interfaces:
i.unconfig_ip6()
i.admin_down()
def test_ip_null(self):
""" IP NULL route """
p = (Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
IPv6(src=self.pg0.remote_ip6, dst="2001::1") /
UDP(sport=1234, dport=1234) /
Raw('\xa5' * 100))
#
# A route via IP NULL that will reply with ICMP unreachables
#
ip_unreach = VppIpRoute(self, "2001::", 64, [], is_unreach=1, is_ip6=1)
ip_unreach.add_vpp_config()
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg0.get_capture(1)
rx = rx[0]
icmp = rx[ICMPv6DestUnreach]
# 0 = "No route to destination"
self.assertEqual(icmp.code, 0)
# ICMP is rate limited. pause a bit
self.sleep(1)
#
# A route via IP NULL that will reply with ICMP prohibited
#
ip_prohibit = VppIpRoute(self, "2001::1", 128, [],
is_prohibit=1, is_ip6=1)
ip_prohibit.add_vpp_config()
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg0.get_capture(1)
rx = rx[0]
icmp = rx[ICMPv6DestUnreach]
# 1 = "Communication with destination administratively prohibited"
self.assertEqual(icmp.code, 1)
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)

View File

@@ -59,8 +59,8 @@ class TestIPMcast(VppTestCase):
def setUp(self):
super(TestIPMcast, self).setUp()
# create 4 pg interfaces
self.create_pg_interfaces(range(4))
# create 8 pg interfaces
self.create_pg_interfaces(range(8))
# setup interfaces
for i in self.pg_interfaces:
@@ -176,7 +176,9 @@ class TestIPMcast(VppTestCase):
#
# A (*,G).
# one accepting interface, pg0, 3 forwarding interfaces
# one accepting interface, pg0, 7 forwarding interfaces
# many forwarding interfaces test the case where the replicare DPO
# needs to use extra cache lines for the buckets.
#
route_232_1_1_1 = VppIpMRoute(
self,
@@ -190,6 +192,14 @@ class TestIPMcast(VppTestCase):
VppMRoutePath(self.pg2.sw_if_index,
MRouteItfFlags.MFIB_ITF_FLAG_FORWARD),
VppMRoutePath(self.pg3.sw_if_index,
MRouteItfFlags.MFIB_ITF_FLAG_FORWARD),
VppMRoutePath(self.pg4.sw_if_index,
MRouteItfFlags.MFIB_ITF_FLAG_FORWARD),
VppMRoutePath(self.pg5.sw_if_index,
MRouteItfFlags.MFIB_ITF_FLAG_FORWARD),
VppMRoutePath(self.pg6.sw_if_index,
MRouteItfFlags.MFIB_ITF_FLAG_FORWARD),
VppMRoutePath(self.pg7.sw_if_index,
MRouteItfFlags.MFIB_ITF_FLAG_FORWARD)])
route_232_1_1_1.add_vpp_config()
@@ -235,9 +245,14 @@ class TestIPMcast(VppTestCase):
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
# We expect replications on Pg1, 2,
# We expect replications on Pg1->7
self.verify_capture_ip4(self.pg1, tx)
self.verify_capture_ip4(self.pg2, tx)
self.verify_capture_ip4(self.pg3, tx)
self.verify_capture_ip4(self.pg4, tx)
self.verify_capture_ip4(self.pg5, tx)
self.verify_capture_ip4(self.pg6, tx)
self.verify_capture_ip4(self.pg7, tx)
# no replications on Pg0
self.pg0.assert_nothing_captured(

View File

@@ -5,10 +5,12 @@ from socket import AF_INET, AF_INET6, inet_pton
from framework import VppTestCase, VppTestRunner
from vpp_neighbor import VppNeighbor, find_nbr
from vpp_ip_route import VppIpRoute, VppRoutePath
from scapy.packet import Raw
from scapy.layers.l2 import Ether, ARP
from scapy.layers.inet import IP, UDP
from scapy.contrib.mpls import MPLS
# not exported by scapy, so redefined here
arp_opts = {"who-has": 1, "is-at": 2}
@@ -88,6 +90,18 @@ class ARPTestCase(VppTestCase):
self.assertEqual(ip.src, sip)
self.assertEqual(ip.dst, dip)
def verify_ip_o_mpls(self, rx, smac, dmac, label, sip, dip):
ether = rx[Ether]
self.assertEqual(ether.dst, dmac)
self.assertEqual(ether.src, smac)
mpls = rx[MPLS]
self.assertTrue(mpls.label, label)
ip = rx[IP]
self.assertEqual(ip.src, sip)
self.assertEqual(ip.dst, dip)
def send_and_assert_no_replies(self, intf, pkts, remark):
intf.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
@@ -430,3 +444,70 @@ class ARPTestCase(VppTestCase):
"ARP req from disable")
self.send_and_assert_no_replies(self.pg2, arp_req_pg2,
"ARP req from disable")
#
# clean up on interface 2
#
self.pg2.set_unnumbered(self.pg1.sw_if_index)
def test_mpls(self):
""" MPLS """
#
# Interface 2 does not yet have ip4 config
#
self.pg2.config_ip4()
self.pg2.generate_remote_hosts(2)
#
# Add a reoute with out going label via an ARP unresolved next-hop
#
ip_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
[VppRoutePath(self.pg2.remote_hosts[1].ip4,
self.pg2.sw_if_index,
labels=[55])])
ip_10_0_0_1.add_vpp_config()
#
# packets should generate an ARP request
#
p = (Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst="10.0.0.1") /
UDP(sport=1234, dport=1234) /
Raw('\xa5' * 100))
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg2.get_capture(1)
self.verify_arp_req(rx[0],
self.pg2.local_mac,
self.pg2.local_ip4,
self.pg2._remote_hosts[1].ip4)
#
# now resolve the neighbours
#
self.pg2.configure_ipv4_neighbors()
#
# Now packet should be properly MPLS encapped.
# This verifies that MPLS link-type adjacencies are completed
# when the ARP entry resolves
#
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg2.get_capture(1)
self.verify_ip_o_mpls(rx[0],
self.pg2.local_mac,
self.pg2.remote_hosts[1].mac,
55,
self.pg0.remote_ip4,
"10.0.0.1")
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)

View File

@@ -335,6 +335,13 @@ class VppInterface(object):
self.sw_if_index,
ip_sw_if_index)
def unset_unnumbered(self, ip_sw_if_index):
""" Unaet the interface to unnumbered via ip_sw_if_index """
self.test.vapi.sw_interface_set_unnumbered(
self.sw_if_index,
ip_sw_if_index,
is_add=0)
def set_proxy_arp(self, enable=1):
""" Set the interface to enable/disable Proxy ARP """
self.test.vapi.proxy_arp_intfc_enable_disable(

View File

@@ -46,26 +46,31 @@ class VppIpRoute(VppObject):
"""
def __init__(self, test, dest_addr,
dest_addr_len, paths, table_id=0, is_ip6=0, is_local=0):
dest_addr_len, paths, table_id=0, is_ip6=0, is_local=0,
is_unreach=0, is_prohibit=0):
self._test = test
self.paths = paths
self.dest_addr_len = dest_addr_len
self.table_id = table_id
self.is_ip6 = is_ip6
self.is_local = is_local
self.is_unreach = is_unreach
self.is_prohibit = is_prohibit
if is_ip6:
self.dest_addr = socket.inet_pton(socket.AF_INET6, dest_addr)
else:
self.dest_addr = socket.inet_pton(socket.AF_INET, dest_addr)
def add_vpp_config(self):
if self.is_local:
if self.is_local or self.is_unreach or self.is_prohibit:
self._test.vapi.ip_add_del_route(
self.dest_addr,
self.dest_addr_len,
socket.inet_pton(socket.AF_INET6, "::"),
0xffffffff,
is_local=1,
is_local=self.is_local,
is_unreach=self.is_unreach,
is_prohibit=self.is_prohibit,
table_id=self.table_id,
is_ipv6=self.is_ip6)
else:
@@ -84,13 +89,15 @@ class VppIpRoute(VppObject):
self._test.registry.register(self, self._test.logger)
def remove_vpp_config(self):
if self.is_local:
if self.is_local or self.is_unreach or self.is_prohibit:
self._test.vapi.ip_add_del_route(
self.dest_addr,
self.dest_addr_len,
socket.inet_pton(socket.AF_INET6, "::"),
0xffffffff,
is_local=1,
is_local=self.is_local,
is_unreach=self.is_unreach,
is_prohibit=self.is_prohibit,
is_add=0,
table_id=self.table_id,
is_ipv6=self.is_ip6)
@@ -116,10 +123,16 @@ class VppIpRoute(VppObject):
return self.object_id()
def object_id(self):
return ("%d:%s/%d"
% (self.table_id,
socket.inet_ntop(socket.AF_INET, self.dest_addr),
self.dest_addr_len))
if self.is_ip6:
return ("%d:%s/%d"
% (self.table_id,
socket.inet_ntop(socket.AF_INET6, self.dest_addr),
self.dest_addr_len))
else:
return ("%d:%s/%d"
% (self.table_id,
socket.inet_ntop(socket.AF_INET, self.dest_addr),
self.dest_addr_len))
class VppIpMRoute(VppObject):