vpp/test/test_neighbor.py
Steven Luong e4238aa34f ethernet: check destination mac for L3 in ethernet-input node
When the NIC does not support mac filter, we rely on ethernet-input
node to do the destination mac check, ie, when the interface is in L3,
the mac address for the packet must be the mac address of the
interface where the packet arrives. This works fine in ethernet-input
node when all packets in the frame might have different interfaces, ie,
ETH_INPUT_FRAME_F_SINGLE_SW_IF_ID is not set in the frame. However,
when all packets are having the same interface,
ETH_INPUT_FRAME_F_SINGLE_SW_IF_ID is set, ethernet-input node goes
through the optimized routine eth_input_single_int -> eth_input_process_frame.
That is where dmac check has a bug when all packets in the frame are
either, ip4, ip6, or mpls without vlan tags. Because without vlan tags,
the code handles all packets in fast path and ignores dmac check.
With vlan tags, the code goes to slow path where dmac check is handled
properly.

The fix is to check if we have a bad dmac in the fast path and force the
code to go to slow path which will handle dmac check properly.

Also do a wholesale correction on all the testcases which do not use
the proper dmac when sending L3 packets.

Type: fix

Change-Id: I73153a805cecdc24c4eefcc781676de04737ae2c
Signed-off-by: Steven Luong <sluong@cisco.com>
2024-05-08 09:42:23 +00:00

2837 lines
85 KiB
Python

#!/usr/bin/env python3
import unittest
import os
from framework import VppTestCase
from asfframework import VppTestRunner, tag_fixme_vpp_workers, tag_fixme_ubuntu2204
from vpp_neighbor import VppNeighbor, find_nbr
from vpp_ip_route import (
VppIpRoute,
VppRoutePath,
find_route,
VppIpTable,
DpoProto,
FibPathType,
VppIpInterfaceAddress,
)
from vpp_papi import VppEnum, MACAddress
from vpp_ip import VppIpPuntRedirect
from vpp_sub_interface import VppDot1ADSubint
from scapy.packet import Raw
from scapy.layers.l2 import Ether, ARP, Dot1Q
from scapy.layers.inet import IP, UDP, TCP
from scapy.layers.inet6 import IPv6
from scapy.contrib.mpls import MPLS
from scapy.layers.inet6 import IPv6
NUM_PKTS = 67
# not exported by scapy, so redefined here
arp_opts = {"who-has": 1, "is-at": 2}
class ARPTestCase(VppTestCase):
"""ARP Test Case"""
@classmethod
def setUpClass(cls):
super(ARPTestCase, cls).setUpClass()
@classmethod
def tearDownClass(cls):
super(ARPTestCase, cls).tearDownClass()
def setUp(self):
super(ARPTestCase, self).setUp()
# create 3 pg interfaces
self.create_pg_interfaces(range(4))
# pg0 configured with ip4 and 6 addresses used for input
# pg1 configured with ip4 and 6 addresses used for output
# pg2 is unnumbered to pg0
for i in self.pg_interfaces:
i.admin_up()
self.pg0.config_ip4()
self.pg0.config_ip6()
self.pg0.resolve_arp()
self.pg1.config_ip4()
self.pg1.config_ip6()
# pg3 in a different VRF
self.tbl = VppIpTable(self, 1)
self.tbl.add_vpp_config()
self.pg3.set_table_ip4(1)
self.pg3.config_ip4()
def tearDown(self):
self.pg0.unconfig_ip4()
self.pg0.unconfig_ip6()
self.pg1.unconfig_ip4()
self.pg1.unconfig_ip6()
self.pg3.unconfig_ip4()
self.pg3.set_table_ip4(0)
for i in self.pg_interfaces:
i.admin_down()
super(ARPTestCase, self).tearDown()
def verify_arp_req(self, rx, smac, sip, dip, etype=0x0806):
ether = rx[Ether]
self.assertEqual(ether.dst, "ff:ff:ff:ff:ff:ff")
self.assertEqual(ether.src, smac)
self.assertEqual(ether.type, etype)
arp = rx[ARP]
self.assertEqual(arp.hwtype, 1)
self.assertEqual(arp.ptype, 0x800)
self.assertEqual(arp.hwlen, 6)
self.assertEqual(arp.plen, 4)
self.assertEqual(arp.op, arp_opts["who-has"])
self.assertEqual(arp.hwsrc, smac)
self.assertEqual(arp.hwdst, "00:00:00:00:00:00")
self.assertEqual(arp.psrc, sip)
self.assertEqual(arp.pdst, dip)
def verify_arp_resp(self, rx, smac, dmac, sip, dip):
ether = rx[Ether]
self.assertEqual(ether.dst, dmac)
self.assertEqual(ether.src, smac)
self.assertEqual(ether.type, 0x0806)
arp = rx[ARP]
self.assertEqual(arp.hwtype, 1)
self.assertEqual(arp.ptype, 0x800)
self.assertEqual(arp.hwlen, 6)
self.assertEqual(arp.plen, 4)
self.assertEqual(arp.op, arp_opts["is-at"])
self.assertEqual(arp.hwsrc, smac)
self.assertEqual(arp.hwdst, dmac)
self.assertEqual(arp.psrc, sip)
self.assertEqual(arp.pdst, dip)
def verify_arp_vrrp_resp(self, rx, smac, dmac, sip, dip):
ether = rx[Ether]
self.assertEqual(ether.dst, dmac)
self.assertEqual(ether.src, smac)
arp = rx[ARP]
self.assertEqual(arp.hwtype, 1)
self.assertEqual(arp.ptype, 0x800)
self.assertEqual(arp.hwlen, 6)
self.assertEqual(arp.plen, 4)
self.assertEqual(arp.op, arp_opts["is-at"])
self.assertNotEqual(arp.hwsrc, smac)
self.assertTrue("00:00:5e:00:01" in arp.hwsrc or "00:00:5E:00:01" in arp.hwsrc)
self.assertEqual(arp.hwdst, dmac)
self.assertEqual(arp.psrc, sip)
self.assertEqual(arp.pdst, dip)
def verify_ip(self, rx, smac, dmac, sip, dip):
ether = rx[Ether]
self.assertEqual(ether.dst, dmac)
self.assertEqual(ether.src, smac)
self.assertEqual(ether.type, 0x0800)
ip = rx[IP]
self.assertEqual(ip.src, sip)
self.assertEqual(ip.dst, dip)
def verify_ip_o_mpls(self, rx, smac, dmac, label, sip, dip):
ether = rx[Ether]
self.assertEqual(ether.dst, dmac)
self.assertEqual(ether.src, smac)
self.assertEqual(ether.type, 0x8847)
mpls = rx[MPLS]
self.assertTrue(mpls.label, label)
ip = rx[IP]
self.assertEqual(ip.src, sip)
self.assertEqual(ip.dst, dip)
def get_arp_rx_requests(self, itf):
"""Get ARP RX request stats for and interface"""
return self.statistics["/net/arp/rx/requests"][:, itf.sw_if_index].sum()
def get_arp_tx_requests(self, itf):
"""Get ARP TX request stats for and interface"""
return self.statistics["/net/arp/tx/requests"][:, itf.sw_if_index].sum()
def get_arp_rx_replies(self, itf):
"""Get ARP RX replies stats for and interface"""
return self.statistics["/net/arp/rx/replies"][:, itf.sw_if_index].sum()
def get_arp_tx_replies(self, itf):
"""Get ARP TX replies stats for and interface"""
return self.statistics["/net/arp/tx/replies"][:, itf.sw_if_index].sum()
def get_arp_rx_garp(self, itf):
"""Get ARP RX grat stats for and interface"""
return self.statistics["/net/arp/rx/gratuitous"][:, itf.sw_if_index].sum()
def get_arp_tx_garp(self, itf):
"""Get ARP RX grat stats for and interface"""
return self.statistics["/net/arp/tx/gratuitous"][:, itf.sw_if_index].sum()
def test_arp(self):
"""ARP"""
#
# Generate some hosts on the LAN
#
self.pg1.generate_remote_hosts(11)
#
# watch for:
# - all neighbour events
# - all neighbor events on pg1
# - neighbor events for host[1] on pg1
#
self.vapi.want_ip_neighbor_events(enable=1, pid=os.getpid())
self.vapi.want_ip_neighbor_events(
enable=1, pid=os.getpid(), sw_if_index=self.pg1.sw_if_index
)
self.vapi.want_ip_neighbor_events(
enable=1,
pid=os.getpid(),
sw_if_index=self.pg1.sw_if_index,
ip=self.pg1.remote_hosts[1].ip4,
)
self.logger.info(self.vapi.cli("sh ip neighbor-watcher"))
#
# Send IP traffic to one of these unresolved hosts.
# expect the generation of an ARP request
#
p = (
Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
/ IP(src=self.pg0.remote_ip4, dst=self.pg1._remote_hosts[1].ip4)
/ UDP(sport=1234, dport=1234)
/ Raw()
)
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg1.get_capture(1)
self.verify_arp_req(
rx[0], self.pg1.local_mac, self.pg1.local_ip4, self.pg1._remote_hosts[1].ip4
)
self.logger.info(self.vapi.cli("sh ip neighbor-stats"))
self.logger.info(self.vapi.cli("sh ip neighbor-stats pg1"))
self.assert_equal(self.get_arp_tx_requests(self.pg1), 1)
#
# And a dynamic ARP entry for host 1
#
dyn_arp = VppNeighbor(
self,
self.pg1.sw_if_index,
self.pg1.remote_hosts[1].mac,
self.pg1.remote_hosts[1].ip4,
)
dyn_arp.add_vpp_config()
self.assertTrue(dyn_arp.query_vpp_config())
self.logger.info(self.vapi.cli("show ip neighbor-watcher"))
# this matches all of the listnerers
es = [self.vapi.wait_for_event(1, "ip_neighbor_event") for i in range(3)]
for e in es:
self.assertEqual(str(e.neighbor.ip_address), self.pg1.remote_hosts[1].ip4)
#
# now we expect IP traffic forwarded
#
dyn_p = (
Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
/ IP(src=self.pg0.remote_ip4, dst=self.pg1._remote_hosts[1].ip4)
/ UDP(sport=1234, dport=1234)
/ Raw()
)
self.pg0.add_stream(dyn_p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg1.get_capture(1)
self.verify_ip(
rx[0],
self.pg1.local_mac,
self.pg1.remote_hosts[1].mac,
self.pg0.remote_ip4,
self.pg1._remote_hosts[1].ip4,
)
#
# And a Static ARP entry for host 2
#
static_arp = VppNeighbor(
self,
self.pg1.sw_if_index,
self.pg1.remote_hosts[2].mac,
self.pg1.remote_hosts[2].ip4,
is_static=1,
)
static_arp.add_vpp_config()
es = [self.vapi.wait_for_event(1, "ip_neighbor_event") for i in range(2)]
for e in es:
self.assertEqual(str(e.neighbor.ip_address), self.pg1.remote_hosts[2].ip4)
static_p = (
Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
/ IP(src=self.pg0.remote_ip4, dst=self.pg1._remote_hosts[2].ip4)
/ UDP(sport=1234, dport=1234)
/ Raw()
)
self.pg0.add_stream(static_p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg1.get_capture(1)
self.verify_ip(
rx[0],
self.pg1.local_mac,
self.pg1.remote_hosts[2].mac,
self.pg0.remote_ip4,
self.pg1._remote_hosts[2].ip4,
)
#
# remove all the listeners
#
self.vapi.want_ip_neighbor_events(enable=0, pid=os.getpid())
self.vapi.want_ip_neighbor_events(
enable=0, pid=os.getpid(), sw_if_index=self.pg1.sw_if_index
)
self.vapi.want_ip_neighbor_events(
enable=0,
pid=os.getpid(),
sw_if_index=self.pg1.sw_if_index,
ip=self.pg1.remote_hosts[1].ip4,
)
#
# flap the link. dynamic ARPs get flush, statics don't
#
self.pg1.admin_down()
self.pg1.admin_up()
self.pg0.add_stream(static_p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg1.get_capture(1)
self.verify_ip(
rx[0],
self.pg1.local_mac,
self.pg1.remote_hosts[2].mac,
self.pg0.remote_ip4,
self.pg1._remote_hosts[2].ip4,
)
self.pg0.add_stream(dyn_p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg1.get_capture(1)
self.verify_arp_req(
rx[0], self.pg1.local_mac, self.pg1.local_ip4, self.pg1._remote_hosts[1].ip4
)
self.assert_equal(self.get_arp_tx_requests(self.pg1), 2)
self.assertFalse(dyn_arp.query_vpp_config())
self.assertTrue(static_arp.query_vpp_config())
#
# Send an ARP request from one of the so-far unlearned remote hosts
#
p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg1._remote_hosts[3].mac) / ARP(
op="who-has",
hwsrc=self.pg1._remote_hosts[3].mac,
pdst=self.pg1.local_ip4,
psrc=self.pg1._remote_hosts[3].ip4,
)
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg1.get_capture(1)
self.verify_arp_resp(
rx[0],
self.pg1.local_mac,
self.pg1._remote_hosts[3].mac,
self.pg1.local_ip4,
self.pg1._remote_hosts[3].ip4,
)
self.logger.info(self.vapi.cli("sh ip neighbor-stats pg1"))
self.assert_equal(self.get_arp_rx_requests(self.pg1), 1)
self.assert_equal(self.get_arp_tx_replies(self.pg1), 1)
#
# VPP should have learned the mapping for the remote host
#
self.assertTrue(
find_nbr(self, self.pg1.sw_if_index, self.pg1._remote_hosts[3].ip4)
)
#
# Fire in an ARP request before the interface becomes IP enabled
#
self.pg2.generate_remote_hosts(4)
p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg2.remote_mac) / ARP(
op="who-has",
hwsrc=self.pg2.remote_mac,
pdst=self.pg1.local_ip4,
psrc=self.pg2.remote_hosts[3].ip4,
)
pt = (
Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg2.remote_mac)
/ Dot1Q(vlan=0)
/ ARP(
op="who-has",
hwsrc=self.pg2.remote_mac,
pdst=self.pg1.local_ip4,
psrc=self.pg2.remote_hosts[3].ip4,
)
)
self.send_and_assert_no_replies(self.pg2, p, "interface not IP enabled")
#
# Make pg2 un-numbered to pg1
#
self.pg2.set_unnumbered(self.pg1.sw_if_index)
#
# test the unnumbered dump both by all interfaces and just the enabled
# one
#
unnum = self.vapi.ip_unnumbered_dump()
self.assertTrue(len(unnum))
self.assertEqual(unnum[0].ip_sw_if_index, self.pg1.sw_if_index)
self.assertEqual(unnum[0].sw_if_index, self.pg2.sw_if_index)
unnum = self.vapi.ip_unnumbered_dump(self.pg2.sw_if_index)
self.assertTrue(len(unnum))
self.assertEqual(unnum[0].ip_sw_if_index, self.pg1.sw_if_index)
self.assertEqual(unnum[0].sw_if_index, self.pg2.sw_if_index)
# Allow for ARP requests from point-to-point ethernet neighbors
# without an attached route on pg2
self.pg2.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg2.get_capture(1)
self.verify_arp_resp(
rx[0],
self.pg2.local_mac,
self.pg2.remote_mac,
self.pg1.local_ip4,
self.pg2.remote_hosts[3].ip4,
)
#
# Allow for ARP requests from neighbors on unnumbered with
# an attached route on pg2
attached_host = VppIpRoute(
self,
self.pg2.remote_hosts[3].ip4,
32,
[VppRoutePath("0.0.0.0", self.pg2.sw_if_index)],
)
attached_host.add_vpp_config()
self.pg2.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg2.get_capture(1)
self.verify_arp_resp(
rx[0],
self.pg2.local_mac,
self.pg2.remote_mac,
self.pg1.local_ip4,
self.pg2.remote_hosts[3].ip4,
)
self.pg2.add_stream(pt)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg2.get_capture(1)
self.verify_arp_resp(
rx[0],
self.pg2.local_mac,
self.pg2.remote_mac,
self.pg1.local_ip4,
self.pg2.remote_hosts[3].ip4,
)
#
# A neighbor entry that has no associated FIB-entry
#
arp_no_fib = VppNeighbor(
self,
self.pg1.sw_if_index,
self.pg1.remote_hosts[4].mac,
self.pg1.remote_hosts[4].ip4,
is_no_fib_entry=1,
)
arp_no_fib.add_vpp_config()
#
# check we have the neighbor, but no route
#
self.assertTrue(
find_nbr(self, self.pg1.sw_if_index, self.pg1._remote_hosts[4].ip4)
)
self.assertFalse(find_route(self, self.pg1._remote_hosts[4].ip4, 32))
#
# pg2 is unnumbered to pg1, so we can form adjacencies out of pg2
# from within pg1's subnet
#
arp_unnum = VppNeighbor(
self,
self.pg2.sw_if_index,
self.pg1.remote_hosts[5].mac,
self.pg1.remote_hosts[5].ip4,
)
arp_unnum.add_vpp_config()
p = (
Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
/ IP(src=self.pg0.remote_ip4, dst=self.pg1._remote_hosts[5].ip4)
/ UDP(sport=1234, dport=1234)
/ Raw()
)
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg2.get_capture(1)
self.verify_ip(
rx[0],
self.pg2.local_mac,
self.pg1.remote_hosts[5].mac,
self.pg0.remote_ip4,
self.pg1._remote_hosts[5].ip4,
)
#
# ARP requests from hosts in pg1's subnet sent on pg2 are replied to
# with the unnumbered interface's address as the source
#
p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg2.remote_mac) / ARP(
op="who-has",
hwsrc=self.pg2.remote_mac,
pdst=self.pg1.local_ip4,
psrc=self.pg1.remote_hosts[6].ip4,
)
self.pg2.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg2.get_capture(1)
self.verify_arp_resp(
rx[0],
self.pg2.local_mac,
self.pg2.remote_mac,
self.pg1.local_ip4,
self.pg1.remote_hosts[6].ip4,
)
#
# An attached host route out of pg2 for an undiscovered hosts generates
# an ARP request with the unnumbered address as the source
#
att_unnum = VppIpRoute(
self,
self.pg1.remote_hosts[7].ip4,
32,
[VppRoutePath("0.0.0.0", self.pg2.sw_if_index)],
)
att_unnum.add_vpp_config()
p = (
Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
/ IP(src=self.pg0.remote_ip4, dst=self.pg1._remote_hosts[7].ip4)
/ UDP(sport=1234, dport=1234)
/ Raw()
)
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg2.get_capture(1)
self.verify_arp_req(
rx[0], self.pg2.local_mac, self.pg1.local_ip4, self.pg1._remote_hosts[7].ip4
)
p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg2.remote_mac) / ARP(
op="who-has",
hwsrc=self.pg2.remote_mac,
pdst=self.pg1.local_ip4,
psrc=self.pg1.remote_hosts[7].ip4,
)
self.pg2.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg2.get_capture(1)
self.verify_arp_resp(
rx[0],
self.pg2.local_mac,
self.pg2.remote_mac,
self.pg1.local_ip4,
self.pg1.remote_hosts[7].ip4,
)
#
# An attached host route as yet unresolved out of pg2 for an
# undiscovered host, an ARP requests begets a response.
#
att_unnum1 = VppIpRoute(
self,
self.pg1.remote_hosts[8].ip4,
32,
[VppRoutePath("0.0.0.0", self.pg2.sw_if_index)],
)
att_unnum1.add_vpp_config()
p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg2.remote_mac) / ARP(
op="who-has",
hwsrc=self.pg2.remote_mac,
pdst=self.pg1.local_ip4,
psrc=self.pg1.remote_hosts[8].ip4,
)
self.pg2.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg2.get_capture(1)
self.verify_arp_resp(
rx[0],
self.pg2.local_mac,
self.pg2.remote_mac,
self.pg1.local_ip4,
self.pg1.remote_hosts[8].ip4,
)
#
# Send an ARP request from one of the so-far unlearned remote hosts
# with a VLAN0 tag
#
p = (
Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg1._remote_hosts[9].mac)
/ Dot1Q(vlan=0)
/ ARP(
op="who-has",
hwsrc=self.pg1._remote_hosts[9].mac,
pdst=self.pg1.local_ip4,
psrc=self.pg1._remote_hosts[9].ip4,
)
)
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg1.get_capture(1)
self.verify_arp_resp(
rx[0],
self.pg1.local_mac,
self.pg1._remote_hosts[9].mac,
self.pg1.local_ip4,
self.pg1._remote_hosts[9].ip4,
)
#
# Add a hierarchy of routes for a host in the sub-net.
# Should still get an ARP resp since the cover is attached
#
p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg1.remote_mac) / ARP(
op="who-has",
hwsrc=self.pg1.remote_mac,
pdst=self.pg1.local_ip4,
psrc=self.pg1.remote_hosts[10].ip4,
)
r1 = VppIpRoute(
self,
self.pg1.remote_hosts[10].ip4,
30,
[VppRoutePath(self.pg1.remote_hosts[10].ip4, self.pg1.sw_if_index)],
)
r1.add_vpp_config()
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg1.get_capture(1)
self.verify_arp_resp(
rx[0],
self.pg1.local_mac,
self.pg1.remote_mac,
self.pg1.local_ip4,
self.pg1.remote_hosts[10].ip4,
)
r2 = VppIpRoute(
self,
self.pg1.remote_hosts[10].ip4,
32,
[VppRoutePath(self.pg1.remote_hosts[10].ip4, self.pg1.sw_if_index)],
)
r2.add_vpp_config()
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg1.get_capture(1)
self.verify_arp_resp(
rx[0],
self.pg1.local_mac,
self.pg1.remote_mac,
self.pg1.local_ip4,
self.pg1.remote_hosts[10].ip4,
)
#
# add an ARP entry that's not on the sub-net and so whose
# adj-fib fails the refinement check. then send an ARP request
# from that source
#
a1 = VppNeighbor(
self, self.pg0.sw_if_index, self.pg0.remote_mac, "100.100.100.50"
)
a1.add_vpp_config()
p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / ARP(
op="who-has",
hwsrc=self.pg0.remote_mac,
psrc="100.100.100.50",
pdst=self.pg0.remote_ip4,
)
self.send_and_assert_no_replies(self.pg0, p, "ARP req for from failed adj-fib")
#
# ERROR Cases
# 1 - don't respond to ARP request for address not within the
# interface's sub-net
# 1b - nor within the unnumbered subnet
# 1c - nor within the subnet of a different interface
#
p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / ARP(
op="who-has",
hwsrc=self.pg0.remote_mac,
pdst="10.10.10.3",
psrc=self.pg0.remote_ip4,
)
self.send_and_assert_no_replies(
self.pg0, p, "ARP req for non-local destination"
)
self.assertFalse(find_nbr(self, self.pg0.sw_if_index, "10.10.10.3"))
p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg2.remote_mac) / ARP(
op="who-has",
hwsrc=self.pg2.remote_mac,
pdst="10.10.10.3",
psrc=self.pg1.remote_hosts[7].ip4,
)
self.send_and_assert_no_replies(
self.pg0, p, "ARP req for non-local destination - unnum"
)
p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / ARP(
op="who-has",
hwsrc=self.pg0.remote_mac,
pdst=self.pg1.local_ip4,
psrc=self.pg1.remote_ip4,
)
self.send_and_assert_no_replies(self.pg0, p, "ARP req diff sub-net")
self.assertFalse(find_nbr(self, self.pg0.sw_if_index, self.pg1.remote_ip4))
#
# 2 - don't respond to ARP request from an address not within the
# interface's sub-net
# 2b - to a proxied address
# 2c - not within a different interface's sub-net
p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / ARP(
op="who-has",
hwsrc=self.pg0.remote_mac,
psrc="10.10.10.3",
pdst=self.pg0.local_ip4,
)
self.send_and_assert_no_replies(self.pg0, p, "ARP req for non-local source")
p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg2.remote_mac) / ARP(
op="who-has",
hwsrc=self.pg2.remote_mac,
psrc="10.10.10.3",
pdst=self.pg0.local_ip4,
)
self.send_and_assert_no_replies(
self.pg0, p, "ARP req for non-local source - unnum"
)
p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / ARP(
op="who-has",
hwsrc=self.pg0.remote_mac,
psrc=self.pg1.remote_ip4,
pdst=self.pg0.local_ip4,
)
self.send_and_assert_no_replies(self.pg0, p, "ARP req for non-local source 2c")
#
# 3 - don't respond to ARP request from an address that belongs to
# the router
#
p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / ARP(
op="who-has",
hwsrc=self.pg0.remote_mac,
psrc=self.pg0.local_ip4,
pdst=self.pg0.local_ip4,
)
self.send_and_assert_no_replies(self.pg0, p, "ARP req for non-local source")
#
# 4 - don't respond to ARP requests that has mac source different
# from ARP request HW source
#
p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / ARP(
op="who-has",
hwsrc="00:00:00:DE:AD:BE",
psrc=self.pg0.remote_ip4,
pdst=self.pg0.local_ip4,
)
self.send_and_assert_no_replies(self.pg0, p, "ARP req for non-local source")
#
# 5 - don't respond to ARP requests for address within the
# interface's sub-net but not the interface's address
#
self.pg0.generate_remote_hosts(2)
p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / ARP(
op="who-has",
hwsrc=self.pg0.remote_mac,
psrc=self.pg0.remote_hosts[0].ip4,
pdst=self.pg0.remote_hosts[1].ip4,
)
self.send_and_assert_no_replies(
self.pg0, p, "ARP req for non-local destination"
)
#
# cleanup
#
static_arp.remove_vpp_config()
self.pg2.unset_unnumbered(self.pg1.sw_if_index)
# need this to flush the adj-fibs
self.pg2.unset_unnumbered(self.pg1.sw_if_index)
self.pg2.admin_down()
self.pg1.admin_down()
def test_arp_after_mac_change(self):
"""ARP (after MAC address change)"""
#
# Prepare a subinterface
#
subif0 = VppDot1ADSubint(self, self.pg1, 0, 300, 400)
subif0.admin_up()
subif0.config_ip4()
#
# Send a packet to cause ARP generation for the parent interface's remote host
#
p1 = (
Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
/ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
/ UDP(sport=1234, dport=1234)
/ Raw()
)
self.pg0.add_stream(p1)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg1.get_capture(1)
self.verify_arp_req(
rx[0], self.pg1.local_mac, self.pg1.local_ip4, self.pg1.remote_ip4
)
#
# Send a packet to cause ARP generation for the subinterface's remote host
#
p2 = (
Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
/ IP(src=self.pg0.remote_ip4, dst=subif0.remote_ip4)
/ UDP(sport=1234, dport=1234)
/ Raw()
)
self.pg0.add_stream(p2)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg1.get_capture(1)
self.verify_arp_req(
rx[0],
self.pg1.local_mac,
subif0.local_ip4,
subif0.remote_ip4,
subif0.DOT1AD_TYPE,
)
#
# Change MAC address of the parent interface
#
pg1_mac_saved = self.pg1.local_mac
self.pg1.set_mac(MACAddress("00:00:00:11:22:33"))
#
# Send a packet to cause ARP generation for the parent interface's remote host
# - expect new MAC address is used as the source
#
self.pg0.add_stream(p1)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg1.get_capture(1)
self.verify_arp_req(
rx[0], self.pg1.local_mac, self.pg1.local_ip4, self.pg1.remote_ip4
)
#
# Send a packet to cause ARP generation for the subinterface's remote host
# - expect new MAC address is used as the source
#
self.pg0.add_stream(p2)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg1.get_capture(1)
self.verify_arp_req(
rx[0],
self.pg1.local_mac,
subif0.local_ip4,
subif0.remote_ip4,
subif0.DOT1AD_TYPE,
)
#
# Cleanup
#
subif0.remove_vpp_config()
self.pg1.set_mac(MACAddress(pg1_mac_saved))
def test_proxy_mirror_arp(self):
"""Interface Mirror Proxy ARP"""
#
# When VPP has an interface whose address is also applied to a TAP
# interface on the host, then VPP's TAP interface will be unnumbered
# to the 'real' interface and do proxy ARP from the host.
# the curious aspect of this setup is that ARP requests from the host
# will come from the VPP's own address.
#
self.pg0.generate_remote_hosts(2)
arp_req_from_me = Ether(src=self.pg2.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
op="who-has",
hwsrc=self.pg2.remote_mac,
pdst=self.pg0.remote_hosts[1].ip4,
psrc=self.pg0.local_ip4,
)
#
# Configure Proxy ARP for the subnet on PG0addresses on pg0
#
self.vapi.proxy_arp_add_del(
proxy={
"table_id": 0,
"low": self.pg0._local_ip4_subnet,
"hi": self.pg0._local_ip4_bcast,
},
is_add=1,
)
# Make pg2 un-numbered to pg0
#
self.pg2.set_unnumbered(self.pg0.sw_if_index)
#
# Enable pg2 for proxy ARP
#
self.pg2.set_proxy_arp()
#
# Send the ARP request with an originating address that
# is VPP's own address
#
rx = self.send_and_expect(self.pg2, [arp_req_from_me], self.pg2)
self.verify_arp_resp(
rx[0],
self.pg2.local_mac,
self.pg2.remote_mac,
self.pg0.remote_hosts[1].ip4,
self.pg0.local_ip4,
)
#
# validate we have not learned an ARP entry as a result of this
#
self.assertFalse(find_nbr(self, self.pg2.sw_if_index, self.pg0.local_ip4))
#
# setup a punt redirect so packets from the uplink go to the tap
#
redirect = VppIpPuntRedirect(
self, self.pg0.sw_if_index, self.pg2.sw_if_index, self.pg0.local_ip4
)
redirect.add_vpp_config()
p_tcp = (
Ether(
src=self.pg0.remote_mac,
dst=self.pg0.local_mac,
)
/ IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
/ TCP(sport=80, dport=80)
/ Raw()
)
rx = self.send_and_expect(self.pg0, [p_tcp], self.pg2)
# there's no ARP entry so this is an ARP req
self.assertTrue(rx[0].haslayer(ARP))
# and ARP entry for VPP's pg0 address on the host interface
n1 = VppNeighbor(
self,
self.pg2.sw_if_index,
self.pg2.remote_mac,
self.pg0.local_ip4,
is_no_fib_entry=True,
).add_vpp_config()
# now the packets shold forward
rx = self.send_and_expect(self.pg0, [p_tcp], self.pg2)
self.assertFalse(rx[0].haslayer(ARP))
self.assertEqual(rx[0][Ether].dst, self.pg2.remote_mac)
#
# flush the neighbor cache on the uplink
#
af = VppEnum.vl_api_address_family_t
self.vapi.ip_neighbor_flush(af.ADDRESS_IP4, self.pg0.sw_if_index)
# ensure we can still resolve the ARPs on the uplink
self.pg0.resolve_arp()
self.assertTrue(find_nbr(self, self.pg0.sw_if_index, self.pg0.remote_ip4))
#
# cleanup
#
self.vapi.proxy_arp_add_del(
proxy={
"table_id": 0,
"low": self.pg0._local_ip4_subnet,
"hi": self.pg0._local_ip4_bcast,
},
is_add=0,
)
redirect.remove_vpp_config()
def test_proxy_arp(self):
"""Proxy ARP"""
self.pg1.generate_remote_hosts(2)
#
# Proxy ARP request packets for each interface
#
arp_req_pg0 = Ether(src=self.pg0.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
op="who-has",
hwsrc=self.pg0.remote_mac,
pdst="10.10.10.3",
psrc=self.pg0.remote_ip4,
)
arp_req_pg0_tagged = (
Ether(src=self.pg0.remote_mac, dst="ff:ff:ff:ff:ff:ff")
/ Dot1Q(vlan=0)
/ ARP(
op="who-has",
hwsrc=self.pg0.remote_mac,
pdst="10.10.10.3",
psrc=self.pg0.remote_ip4,
)
)
arp_req_pg1 = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
op="who-has",
hwsrc=self.pg1.remote_mac,
pdst="10.10.10.3",
psrc=self.pg1.remote_ip4,
)
arp_req_pg2 = Ether(src=self.pg2.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
op="who-has",
hwsrc=self.pg2.remote_mac,
pdst="10.10.10.3",
psrc=self.pg1.remote_hosts[1].ip4,
)
arp_req_pg3 = Ether(src=self.pg3.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
op="who-has",
hwsrc=self.pg3.remote_mac,
pdst="10.10.10.3",
psrc=self.pg3.remote_ip4,
)
#
# Configure Proxy ARP for 10.10.10.0 -> 10.10.10.124
#
self.vapi.proxy_arp_add_del(
proxy={"table_id": 0, "low": "10.10.10.2", "hi": "10.10.10.124"}, is_add=1
)
#
# No responses are sent when the interfaces are not enabled for proxy
# ARP
#
self.send_and_assert_no_replies(
self.pg0, arp_req_pg0, "ARP req from unconfigured interface"
)
self.send_and_assert_no_replies(
self.pg2, arp_req_pg2, "ARP req from unconfigured interface"
)
#
# Make pg2 un-numbered to pg1
# still won't reply.
#
self.pg2.set_unnumbered(self.pg1.sw_if_index)
self.send_and_assert_no_replies(
self.pg2, arp_req_pg2, "ARP req from unnumbered interface"
)
#
# Enable each interface to reply to proxy ARPs
#
for i in self.pg_interfaces:
i.set_proxy_arp()
#
# Now each of the interfaces should reply to a request to a proxied
# address
#
self.pg0.add_stream(arp_req_pg0)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg0.get_capture(1)
self.verify_arp_resp(
rx[0],
self.pg0.local_mac,
self.pg0.remote_mac,
"10.10.10.3",
self.pg0.remote_ip4,
)
self.pg0.add_stream(arp_req_pg0_tagged)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg0.get_capture(1)
self.verify_arp_resp(
rx[0],
self.pg0.local_mac,
self.pg0.remote_mac,
"10.10.10.3",
self.pg0.remote_ip4,
)
self.pg1.add_stream(arp_req_pg1)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg1.get_capture(1)
self.verify_arp_resp(
rx[0],
self.pg1.local_mac,
self.pg1.remote_mac,
"10.10.10.3",
self.pg1.remote_ip4,
)
self.pg2.add_stream(arp_req_pg2)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg2.get_capture(1)
self.verify_arp_resp(
rx[0],
self.pg2.local_mac,
self.pg2.remote_mac,
"10.10.10.3",
self.pg1.remote_hosts[1].ip4,
)
#
# A request for an address out of the configured range
#
arp_req_pg1_hi = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
op="who-has",
hwsrc=self.pg1.remote_mac,
pdst="10.10.10.125",
psrc=self.pg1.remote_ip4,
)
self.send_and_assert_no_replies(
self.pg1, arp_req_pg1_hi, "ARP req out of range HI"
)
arp_req_pg1_low = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
op="who-has",
hwsrc=self.pg1.remote_mac,
pdst="10.10.10.1",
psrc=self.pg1.remote_ip4,
)
self.send_and_assert_no_replies(
self.pg1, arp_req_pg1_low, "ARP req out of range Low"
)
#
# Request for an address in the proxy range but from an interface
# in a different VRF
#
self.send_and_assert_no_replies(
self.pg3, arp_req_pg3, "ARP req from different VRF"
)
#
# Disable Each interface for proxy ARP
# - expect none to respond
#
for i in self.pg_interfaces:
i.set_proxy_arp(0)
self.send_and_assert_no_replies(self.pg0, arp_req_pg0, "ARP req from disable")
self.send_and_assert_no_replies(self.pg1, arp_req_pg1, "ARP req from disable")
self.send_and_assert_no_replies(self.pg2, arp_req_pg2, "ARP req from disable")
#
# clean up on interface 2
#
self.pg2.unset_unnumbered(self.pg1.sw_if_index)
def test_mpls(self):
"""MPLS"""
#
# Interface 2 does not yet have ip4 config
#
self.pg2.config_ip4()
self.pg2.generate_remote_hosts(2)
#
# Add a route with out going label via an ARP unresolved next-hop
#
ip_10_0_0_1 = VppIpRoute(
self,
"10.0.0.1",
32,
[
VppRoutePath(
self.pg2.remote_hosts[1].ip4, self.pg2.sw_if_index, labels=[55]
)
],
)
ip_10_0_0_1.add_vpp_config()
#
# packets should generate an ARP request
#
p = (
Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
/ IP(src=self.pg0.remote_ip4, dst="10.0.0.1")
/ UDP(sport=1234, dport=1234)
/ Raw(b"\xa5" * 100)
)
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg2.get_capture(1)
self.verify_arp_req(
rx[0], self.pg2.local_mac, self.pg2.local_ip4, self.pg2._remote_hosts[1].ip4
)
#
# now resolve the neighbours
#
self.pg2.configure_ipv4_neighbors()
#
# Now packet should be properly MPLS encapped.
# This verifies that MPLS link-type adjacencies are completed
# when the ARP entry resolves
#
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg2.get_capture(1)
self.verify_ip_o_mpls(
rx[0],
self.pg2.local_mac,
self.pg2.remote_hosts[1].mac,
55,
self.pg0.remote_ip4,
"10.0.0.1",
)
self.pg2.unconfig_ip4()
def test_arp_vrrp(self):
"""ARP reply with VRRP virtual src hw addr"""
#
# IP packet destined for pg1 remote host arrives on pg0 resulting
# in an ARP request for the address of the remote host on pg1
#
p0 = (
Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
/ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
/ UDP(sport=1234, dport=1234)
/ Raw()
)
rx1 = self.send_and_expect(self.pg0, [p0], self.pg1)
self.verify_arp_req(
rx1[0], self.pg1.local_mac, self.pg1.local_ip4, self.pg1.remote_ip4
)
#
# ARP reply for address of pg1 remote host arrives on pg1 with
# the hw src addr set to a value in the VRRP IPv4 range of
# MAC addresses
#
p1 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / ARP(
op="is-at",
hwdst=self.pg1.local_mac,
hwsrc="00:00:5e:00:01:09",
pdst=self.pg1.local_ip4,
psrc=self.pg1.remote_ip4,
)
self.send_and_assert_no_replies(self.pg1, p1, "ARP reply")
#
# IP packet destined for pg1 remote host arrives on pg0 again.
# VPP should have an ARP entry for that address now and the packet
# should be sent out pg1.
#
rx1 = self.send_and_expect(self.pg0, [p0], self.pg1)
self.verify_ip(
rx1[0],
self.pg1.local_mac,
"00:00:5e:00:01:09",
self.pg0.remote_ip4,
self.pg1.remote_ip4,
)
self.pg1.admin_down()
self.pg1.admin_up()
def test_arp_duplicates(self):
"""ARP Duplicates"""
#
# Generate some hosts on the LAN
#
self.pg1.generate_remote_hosts(3)
#
# Add host 1 on pg1 and pg2
#
arp_pg1 = VppNeighbor(
self,
self.pg1.sw_if_index,
self.pg1.remote_hosts[1].mac,
self.pg1.remote_hosts[1].ip4,
)
arp_pg1.add_vpp_config()
arp_pg2 = VppNeighbor(
self,
self.pg2.sw_if_index,
self.pg2.remote_mac,
self.pg1.remote_hosts[1].ip4,
)
arp_pg2.add_vpp_config()
#
# IP packet destined for pg1 remote host arrives on pg1 again.
#
p = (
Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
/ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_hosts[1].ip4)
/ UDP(sport=1234, dport=1234)
/ Raw()
)
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx1 = self.pg1.get_capture(1)
self.verify_ip(
rx1[0],
self.pg1.local_mac,
self.pg1.remote_hosts[1].mac,
self.pg0.remote_ip4,
self.pg1.remote_hosts[1].ip4,
)
#
# remove the duplicate on pg1
# packet stream should generate ARPs out of pg1
#
arp_pg1.remove_vpp_config()
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx1 = self.pg1.get_capture(1)
self.verify_arp_req(
rx1[0], self.pg1.local_mac, self.pg1.local_ip4, self.pg1.remote_hosts[1].ip4
)
#
# Add it back
#
arp_pg1.add_vpp_config()
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx1 = self.pg1.get_capture(1)
self.verify_ip(
rx1[0],
self.pg1.local_mac,
self.pg1.remote_hosts[1].mac,
self.pg0.remote_ip4,
self.pg1.remote_hosts[1].ip4,
)
def test_arp_static(self):
"""ARP Static"""
self.pg2.generate_remote_hosts(3)
#
# Add a static ARP entry
#
static_arp = VppNeighbor(
self,
self.pg2.sw_if_index,
self.pg2.remote_hosts[1].mac,
self.pg2.remote_hosts[1].ip4,
is_static=1,
)
static_arp.add_vpp_config()
#
# Add the connected prefix to the interface
#
self.pg2.config_ip4()
#
# We should now find the adj-fib
#
self.assertTrue(
find_nbr(
self, self.pg2.sw_if_index, self.pg2.remote_hosts[1].ip4, is_static=1
)
)
self.assertTrue(find_route(self, self.pg2.remote_hosts[1].ip4, 32))
#
# remove the connected
#
self.pg2.unconfig_ip4()
#
# put the interface into table 1
#
self.pg2.set_table_ip4(1)
#
# configure the same connected and expect to find the
# adj fib in the new table
#
self.pg2.config_ip4()
self.assertTrue(find_route(self, self.pg2.remote_hosts[1].ip4, 32, table_id=1))
#
# clean-up
#
self.pg2.unconfig_ip4()
static_arp.remove_vpp_config()
self.pg2.set_table_ip4(0)
def test_arp_static_replace_dynamic_same_mac(self):
"""ARP Static can replace Dynamic (same mac)"""
self.pg2.generate_remote_hosts(1)
dyn_arp = VppNeighbor(
self,
self.pg2.sw_if_index,
self.pg2.remote_hosts[0].mac,
self.pg2.remote_hosts[0].ip4,
)
static_arp = VppNeighbor(
self,
self.pg2.sw_if_index,
self.pg2.remote_hosts[0].mac,
self.pg2.remote_hosts[0].ip4,
is_static=1,
)
#
# Add a dynamic ARP entry
#
dyn_arp.add_vpp_config()
#
# We should find the dynamic nbr
#
self.assertFalse(
find_nbr(
self, self.pg2.sw_if_index, self.pg2.remote_hosts[0].ip4, is_static=1
)
)
self.assertTrue(
find_nbr(
self,
self.pg2.sw_if_index,
self.pg2.remote_hosts[0].ip4,
is_static=0,
mac=self.pg2.remote_hosts[0].mac,
)
)
#
# Add a static ARP entry with the same mac
#
static_arp.add_vpp_config()
#
# We should now find the static nbr with the same mac
#
self.assertFalse(
find_nbr(
self, self.pg2.sw_if_index, self.pg2.remote_hosts[0].ip4, is_static=0
)
)
self.assertTrue(
find_nbr(
self,
self.pg2.sw_if_index,
self.pg2.remote_hosts[0].ip4,
is_static=1,
mac=self.pg2.remote_hosts[0].mac,
)
)
#
# clean-up
#
static_arp.remove_vpp_config()
def test_arp_static_replace_dynamic_diff_mac(self):
"""ARP Static can replace Dynamic (diff mac)"""
self.pg2.generate_remote_hosts(2)
dyn_arp = VppNeighbor(
self,
self.pg2.sw_if_index,
self.pg2.remote_hosts[0].mac,
self.pg2.remote_hosts[0].ip4,
)
static_arp = VppNeighbor(
self,
self.pg2.sw_if_index,
self.pg2.remote_hosts[1].mac,
self.pg2.remote_hosts[0].ip4,
is_static=1,
)
#
# Add a dynamic ARP entry
#
dyn_arp.add_vpp_config()
#
# We should find the dynamic nbr
#
self.assertFalse(
find_nbr(
self, self.pg2.sw_if_index, self.pg2.remote_hosts[0].ip4, is_static=1
)
)
self.assertTrue(
find_nbr(
self,
self.pg2.sw_if_index,
self.pg2.remote_hosts[0].ip4,
is_static=0,
mac=self.pg2.remote_hosts[0].mac,
)
)
#
# Add a static ARP entry with a changed mac
#
static_arp.add_vpp_config()
#
# We should now find the static nbr with a changed mac
#
self.assertFalse(
find_nbr(
self, self.pg2.sw_if_index, self.pg2.remote_hosts[0].ip4, is_static=0
)
)
self.assertTrue(
find_nbr(
self,
self.pg2.sw_if_index,
self.pg2.remote_hosts[0].ip4,
is_static=1,
mac=self.pg2.remote_hosts[1].mac,
)
)
#
# clean-up
#
static_arp.remove_vpp_config()
def test_arp_incomplete(self):
"""ARP Incomplete"""
self.pg1.generate_remote_hosts(4)
p0 = (
Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
/ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_hosts[1].ip4)
/ UDP(sport=1234, dport=1234)
/ Raw()
)
p1 = (
Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
/ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_hosts[2].ip4)
/ UDP(sport=1234, dport=1234)
/ Raw()
)
p2 = (
Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
/ IP(src=self.pg0.remote_ip4, dst="1.1.1.1")
/ UDP(sport=1234, dport=1234)
/ Raw()
)
#
# a packet to an unresolved destination generates an ARP request
#
rx = self.send_and_expect(self.pg0, [p0], self.pg1)
self.verify_arp_req(
rx[0], self.pg1.local_mac, self.pg1.local_ip4, self.pg1._remote_hosts[1].ip4
)
#
# add a neighbour for remote host 1
#
static_arp = VppNeighbor(
self,
self.pg1.sw_if_index,
self.pg1.remote_hosts[1].mac,
self.pg1.remote_hosts[1].ip4,
is_static=1,
)
static_arp.add_vpp_config()
#
# add a route through remote host 3 hence we get an incomplete
#
VppIpRoute(
self,
"1.1.1.1",
32,
[VppRoutePath(self.pg1.remote_hosts[3].ip4, self.pg1.sw_if_index)],
).add_vpp_config()
rx = self.send_and_expect(self.pg0, [p2], self.pg1)
self.verify_arp_req(
rx[0], self.pg1.local_mac, self.pg1.local_ip4, self.pg1._remote_hosts[3].ip4
)
#
# change the interface's MAC
#
self.vapi.sw_interface_set_mac_address(
self.pg1.sw_if_index, "00:00:00:33:33:33"
)
#
# now ARP requests come from the new source mac
#
rx = self.send_and_expect(self.pg0, [p1], self.pg1)
self.verify_arp_req(
rx[0],
"00:00:00:33:33:33",
self.pg1.local_ip4,
self.pg1._remote_hosts[2].ip4,
)
rx = self.send_and_expect(self.pg0, [p2], self.pg1)
self.verify_arp_req(
rx[0],
"00:00:00:33:33:33",
self.pg1.local_ip4,
self.pg1._remote_hosts[3].ip4,
)
#
# packets to the resolved host also have the new source mac
#
rx = self.send_and_expect(self.pg0, [p0], self.pg1)
self.verify_ip(
rx[0],
"00:00:00:33:33:33",
self.pg1.remote_hosts[1].mac,
self.pg0.remote_ip4,
self.pg1.remote_hosts[1].ip4,
)
#
# set the mac address on the interface that does not have a
# configured subnet and thus no glean
#
self.vapi.sw_interface_set_mac_address(
self.pg2.sw_if_index, "00:00:00:33:33:33"
)
def test_garp(self):
"""GARP"""
#
# Generate some hosts on the LAN
#
self.pg1.generate_remote_hosts(4)
self.pg2.generate_remote_hosts(4)
#
# And an ARP entry
#
arp = VppNeighbor(
self,
self.pg1.sw_if_index,
self.pg1.remote_hosts[1].mac,
self.pg1.remote_hosts[1].ip4,
)
arp.add_vpp_config()
self.assertTrue(
find_nbr(
self,
self.pg1.sw_if_index,
self.pg1.remote_hosts[1].ip4,
mac=self.pg1.remote_hosts[1].mac,
)
)
#
# Send a GARP (request) to swap the host 1's address to that of host 2
#
p1 = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg1.remote_hosts[2].mac) / ARP(
op="who-has",
hwdst=self.pg1.local_mac,
hwsrc=self.pg1.remote_hosts[2].mac,
pdst=self.pg1.remote_hosts[1].ip4,
psrc=self.pg1.remote_hosts[1].ip4,
)
self.pg1.add_stream(p1)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.assertTrue(
find_nbr(
self,
self.pg1.sw_if_index,
self.pg1.remote_hosts[1].ip4,
mac=self.pg1.remote_hosts[2].mac,
)
)
self.assert_equal(self.get_arp_rx_garp(self.pg1), 1)
#
# Send a GARP (reply) to swap the host 1's address to that of host 3
#
p1 = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg1.remote_hosts[3].mac) / ARP(
op="is-at",
hwdst=self.pg1.local_mac,
hwsrc=self.pg1.remote_hosts[3].mac,
pdst=self.pg1.remote_hosts[1].ip4,
psrc=self.pg1.remote_hosts[1].ip4,
)
self.pg1.add_stream(p1)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.assertTrue(
find_nbr(
self,
self.pg1.sw_if_index,
self.pg1.remote_hosts[1].ip4,
mac=self.pg1.remote_hosts[3].mac,
)
)
self.assert_equal(self.get_arp_rx_garp(self.pg1), 2)
#
# GARPs (request nor replies) for host we don't know yet
# don't result in new neighbour entries
#
p1 = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg1.remote_hosts[3].mac) / ARP(
op="who-has",
hwdst=self.pg1.local_mac,
hwsrc=self.pg1.remote_hosts[3].mac,
pdst=self.pg1.remote_hosts[2].ip4,
psrc=self.pg1.remote_hosts[2].ip4,
)
self.pg1.add_stream(p1)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.assertFalse(
find_nbr(self, self.pg1.sw_if_index, self.pg1.remote_hosts[2].ip4)
)
p1 = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg1.remote_hosts[3].mac) / ARP(
op="is-at",
hwdst=self.pg1.local_mac,
hwsrc=self.pg1.remote_hosts[3].mac,
pdst=self.pg1.remote_hosts[2].ip4,
psrc=self.pg1.remote_hosts[2].ip4,
)
self.pg1.add_stream(p1)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.assertFalse(
find_nbr(self, self.pg1.sw_if_index, self.pg1.remote_hosts[2].ip4)
)
#
# IP address in different subnets are not learnt
#
self.pg2.configure_ipv4_neighbors()
cntr = self.statistics.get_err_counter(
"/err/arp-reply/l3_dst_address_not_local"
)
for op in ["is-at", "who-has"]:
p1 = [
(
Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg2.remote_hosts[1].mac)
/ ARP(
op=op,
hwdst=self.pg2.local_mac,
hwsrc=self.pg2.remote_hosts[1].mac,
pdst=self.pg2.remote_hosts[1].ip4,
psrc=self.pg2.remote_hosts[1].ip4,
)
),
(
Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg2.remote_hosts[1].mac)
/ ARP(
op=op,
hwdst="ff:ff:ff:ff:ff:ff",
hwsrc=self.pg2.remote_hosts[1].mac,
pdst=self.pg2.remote_hosts[1].ip4,
psrc=self.pg2.remote_hosts[1].ip4,
)
),
]
self.send_and_assert_no_replies(self.pg1, p1)
self.assertFalse(
find_nbr(self, self.pg1.sw_if_index, self.pg2.remote_hosts[1].ip4)
)
# they are all dropped because the subnet's don't match
self.assertEqual(
cntr + 4,
self.statistics.get_err_counter("/err/arp-reply/l3_dst_address_not_local"),
)
def test_arp_incomplete2(self):
"""Incomplete Entries"""
#
# ensure that we throttle the ARP and ND requests
#
self.pg0.generate_remote_hosts(2)
#
# IPv4/ARP
#
ip_10_0_0_1 = VppIpRoute(
self,
"10.0.0.1",
32,
[VppRoutePath(self.pg0.remote_hosts[1].ip4, self.pg0.sw_if_index)],
)
ip_10_0_0_1.add_vpp_config()
p1 = (
Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
/ IP(src=self.pg1.remote_ip4, dst="10.0.0.1")
/ UDP(sport=1234, dport=1234)
/ Raw()
)
self.pg1.add_stream(p1 * 257)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg0._get_capture(1)
#
# how many we get is going to be dependent on the time for packet
# processing but it should be small
#
self.assertLess(len(rx), 64)
#
# IPv6/ND
#
ip_10_1 = VppIpRoute(
self,
"10::1",
128,
[
VppRoutePath(
self.pg0.remote_hosts[1].ip6,
self.pg0.sw_if_index,
proto=DpoProto.DPO_PROTO_IP6,
)
],
)
ip_10_1.add_vpp_config()
p1 = (
Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
/ IPv6(src=self.pg1.remote_ip6, dst="10::1")
/ UDP(sport=1234, dport=1234)
/ Raw()
)
self.pg1.add_stream(p1 * 257)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg0._get_capture(1)
#
# how many we get is going to be dependent on the time for packet
# processing but it should be small
#
self.assertLess(len(rx), 64)
def test_arp_forus(self):
"""ARP for for-us"""
#
# Test that VPP responds with ARP requests to addresses that
# are connected and local routes.
# Use one of the 'remote' addresses in the subnet as a local address
# The intention of this route is that it then acts like a secondary
# address added to an interface
#
self.pg0.generate_remote_hosts(2)
forus = VppIpRoute(
self,
self.pg0.remote_hosts[1].ip4,
32,
[
VppRoutePath(
"0.0.0.0",
self.pg0.sw_if_index,
type=FibPathType.FIB_PATH_TYPE_LOCAL,
)
],
)
forus.add_vpp_config()
p = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / ARP(
op="who-has",
hwdst=self.pg0.local_mac,
hwsrc=self.pg0.remote_mac,
pdst=self.pg0.remote_hosts[1].ip4,
psrc=self.pg0.remote_ip4,
)
rx = self.send_and_expect(self.pg0, [p], self.pg0)
self.verify_arp_resp(
rx[0],
self.pg0.local_mac,
self.pg0.remote_mac,
self.pg0.remote_hosts[1].ip4,
self.pg0.remote_ip4,
)
def test_arp_table_swap(self):
#
# Generate some hosts on the LAN
#
N_NBRS = 4
self.pg1.generate_remote_hosts(N_NBRS)
for n in range(N_NBRS):
# a route thru each neighbour
VppIpRoute(
self,
"10.0.0.%d" % n,
32,
[VppRoutePath(self.pg1.remote_hosts[n].ip4, self.pg1.sw_if_index)],
).add_vpp_config()
# resolve each neighbour
p1 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / ARP(
op="is-at",
hwdst=self.pg1.local_mac,
hwsrc="00:00:5e:00:01:09",
pdst=self.pg1.local_ip4,
psrc=self.pg1.remote_hosts[n].ip4,
)
self.send_and_assert_no_replies(self.pg1, p1, "ARP reply")
self.logger.info(self.vapi.cli("sh ip neighbors"))
#
# swap the table pg1 is in
#
table = VppIpTable(self, 100).add_vpp_config()
self.pg1.unconfig_ip4()
self.pg1.set_table_ip4(100)
self.pg1.config_ip4()
#
# all neighbours are cleared
#
for n in range(N_NBRS):
self.assertFalse(
find_nbr(self, self.pg1.sw_if_index, self.pg1.remote_hosts[n].ip4)
)
#
# packets to all neighbours generate ARP requests
#
for n in range(N_NBRS):
# a route thru each neighbour
VppIpRoute(
self,
"10.0.0.%d" % n,
32,
[VppRoutePath(self.pg1.remote_hosts[n].ip4, self.pg1.sw_if_index)],
table_id=100,
).add_vpp_config()
p = (
Ether(src=self.pg1.remote_hosts[n].mac, dst=self.pg1.local_mac)
/ IP(src=self.pg1.remote_hosts[n].ip4, dst="10.0.0.%d" % n)
/ Raw(b"0x5" * 100)
)
rxs = self.send_and_expect(self.pg1, [p], self.pg1)
for rx in rxs:
self.verify_arp_req(
rx,
self.pg1.local_mac,
self.pg1.local_ip4,
self.pg1.remote_hosts[n].ip4,
)
self.pg1.unconfig_ip4()
self.pg1.set_table_ip4(0)
def test_glean_src_select(self):
"""Multi Connecteds"""
#
# configure multiple connected subnets on an interface
# and ensure that ARP requests for hosts on those subnets
# pick up the correct source address
#
conn1 = VppIpInterfaceAddress(self, self.pg1, "10.0.0.1", 24).add_vpp_config()
conn2 = VppIpInterfaceAddress(self, self.pg1, "10.0.1.1", 24).add_vpp_config()
p1 = (
Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
/ IP(src=self.pg1.remote_ip4, dst="10.0.0.128")
/ Raw(b"0x5" * 100)
)
rxs = self.send_and_expect(self.pg0, [p1], self.pg1)
for rx in rxs:
self.verify_arp_req(rx, self.pg1.local_mac, "10.0.0.1", "10.0.0.128")
p2 = (
Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
/ IP(src=self.pg1.remote_ip4, dst="10.0.1.128")
/ Raw(b"0x5" * 100)
)
rxs = self.send_and_expect(self.pg0, [p2], self.pg1)
for rx in rxs:
self.verify_arp_req(rx, self.pg1.local_mac, "10.0.1.1", "10.0.1.128")
#
# add a local address in the same subnet
# the source addresses are equivalent.
# VPP leaves the glean address being used for a prefix
# in place until that address is deleted.
#
conn3 = VppIpInterfaceAddress(self, self.pg1, "10.0.1.2", 24).add_vpp_config()
rxs = self.send_and_expect(self.pg0, [p2], self.pg1)
for rx in rxs:
self.verify_arp_req(rx, self.pg1.local_mac, "10.0.1.1", "10.0.1.128")
#
# remove first address, which is currently in use
# the second address should be used now
#
conn2.remove_vpp_config()
rxs = self.send_and_expect(self.pg0, [p2], self.pg1)
for rx in rxs:
self.verify_arp_req(rx, self.pg1.local_mac, "10.0.1.2", "10.0.1.128")
#
# add first address back. Second address should continue
# being used.
#
conn2 = VppIpInterfaceAddress(self, self.pg1, "10.0.1.1", 24).add_vpp_config()
rxs = self.send_and_expect(self.pg0, [p2], self.pg1)
for rx in rxs:
self.verify_arp_req(rx, self.pg1.local_mac, "10.0.1.2", "10.0.1.128")
conn1.remove_vpp_config()
rxs = self.send_and_expect(self.pg0, [p2], self.pg1)
for rx in rxs:
self.verify_arp_req(rx, self.pg1.local_mac, "10.0.1.2", "10.0.1.128")
# apply a connected prefix to an interface in a different table
VppIpRoute(
self,
"10.0.1.0",
24,
[VppRoutePath("0.0.0.0", self.pg1.sw_if_index)],
table_id=1,
).add_vpp_config()
p2.dst = self.pg3.local_mac
rxs = self.send_and_expect(self.pg3, [p2], self.pg1)
for rx in rxs:
self.verify_arp_req(rx, self.pg1.local_mac, "10.0.1.2", "10.0.1.128")
# apply an attached prefix to the interface
# since there's no local address in this prefix,
# any other address is used
p3 = (
Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
/ IP(src=self.pg1.remote_ip4, dst="10.0.2.128")
/ Raw(b"0x5" * 100)
)
VppIpRoute(
self,
"10.0.2.0",
24,
[VppRoutePath("0.0.0.0", self.pg1.sw_if_index)],
).add_vpp_config()
rxs = self.send_and_expect(self.pg0, [p3], self.pg1)
for rx in rxs:
self.verify_arp_req(
rx, self.pg1.local_mac, self.pg1.local_ip4, "10.0.2.128"
)
# cleanup
conn3.remove_vpp_config()
conn2.remove_vpp_config()
@tag_fixme_vpp_workers
class NeighborStatsTestCase(VppTestCase):
"""ARP/ND Counters"""
@classmethod
def setUpClass(cls):
super(NeighborStatsTestCase, cls).setUpClass()
@classmethod
def tearDownClass(cls):
super(NeighborStatsTestCase, cls).tearDownClass()
def setUp(self):
super(NeighborStatsTestCase, self).setUp()
self.create_pg_interfaces(range(2))
# pg0 configured with ip4 and 6 addresses used for input
# pg1 configured with ip4 and 6 addresses used for output
# pg2 is unnumbered to pg0
for i in self.pg_interfaces:
i.admin_up()
i.config_ip4()
i.config_ip6()
i.resolve_arp()
i.resolve_ndp()
def tearDown(self):
super(NeighborStatsTestCase, self).tearDown()
for i in self.pg_interfaces:
i.unconfig_ip4()
i.unconfig_ip6()
i.admin_down()
def test_arp_stats(self):
"""ARP Counters"""
self.vapi.cli("adj counters enable")
self.pg1.generate_remote_hosts(2)
arp1 = VppNeighbor(
self,
self.pg1.sw_if_index,
self.pg1.remote_hosts[0].mac,
self.pg1.remote_hosts[0].ip4,
)
arp1.add_vpp_config()
arp2 = VppNeighbor(
self,
self.pg1.sw_if_index,
self.pg1.remote_hosts[1].mac,
self.pg1.remote_hosts[1].ip4,
)
arp2.add_vpp_config()
p1 = (
Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
/ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_hosts[0].ip4)
/ UDP(sport=1234, dport=1234)
/ Raw()
)
p2 = (
Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
/ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_hosts[1].ip4)
/ UDP(sport=1234, dport=1234)
/ Raw()
)
rx = self.send_and_expect(self.pg0, p1 * NUM_PKTS, self.pg1)
rx = self.send_and_expect(self.pg0, p2 * NUM_PKTS, self.pg1)
self.assertEqual(NUM_PKTS, arp1.get_stats()["packets"])
self.assertEqual(NUM_PKTS, arp2.get_stats()["packets"])
rx = self.send_and_expect(self.pg0, p1 * NUM_PKTS, self.pg1)
self.assertEqual(NUM_PKTS * 2, arp1.get_stats()["packets"])
def test_nd_stats(self):
"""ND Counters"""
self.vapi.cli("adj counters enable")
self.pg0.generate_remote_hosts(3)
nd1 = VppNeighbor(
self,
self.pg0.sw_if_index,
self.pg0.remote_hosts[1].mac,
self.pg0.remote_hosts[1].ip6,
)
nd1.add_vpp_config()
nd2 = VppNeighbor(
self,
self.pg0.sw_if_index,
self.pg0.remote_hosts[2].mac,
self.pg0.remote_hosts[2].ip6,
)
nd2.add_vpp_config()
p1 = (
Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
/ IPv6(src=self.pg1.remote_ip6, dst=self.pg0.remote_hosts[1].ip6)
/ UDP(sport=1234, dport=1234)
/ Raw()
)
p2 = (
Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
/ IPv6(src=self.pg1.remote_ip6, dst=self.pg0.remote_hosts[2].ip6)
/ UDP(sport=1234, dport=1234)
/ Raw()
)
rx = self.send_and_expect(self.pg1, p1 * 16, self.pg0)
rx = self.send_and_expect(self.pg1, p2 * 16, self.pg0)
self.assertEqual(16, nd1.get_stats()["packets"])
self.assertEqual(16, nd2.get_stats()["packets"])
rx = self.send_and_expect(self.pg1, p1 * NUM_PKTS, self.pg0)
self.assertEqual(NUM_PKTS + 16, nd1.get_stats()["packets"])
@tag_fixme_ubuntu2204
class NeighborAgeTestCase(VppTestCase):
"""ARP/ND Aging"""
@classmethod
def setUpClass(cls):
super(NeighborAgeTestCase, cls).setUpClass()
@classmethod
def tearDownClass(cls):
super(NeighborAgeTestCase, cls).tearDownClass()
def setUp(self):
super(NeighborAgeTestCase, self).setUp()
self.create_pg_interfaces(range(1))
# pg0 configured with ip4 and 6 addresses used for input
# pg1 configured with ip4 and 6 addresses used for output
# pg2 is unnumbered to pg0
for i in self.pg_interfaces:
i.admin_up()
i.config_ip4()
i.config_ip6()
i.resolve_arp()
i.resolve_ndp()
def tearDown(self):
super(NeighborAgeTestCase, self).tearDown()
for i in self.pg_interfaces:
i.unconfig_ip4()
i.unconfig_ip6()
i.admin_down()
def verify_arp_req(self, rx, smac, sip, dip):
ether = rx[Ether]
self.assertEqual(ether.dst, "ff:ff:ff:ff:ff:ff")
self.assertEqual(ether.src, smac)
arp = rx[ARP]
self.assertEqual(arp.hwtype, 1)
self.assertEqual(arp.ptype, 0x800)
self.assertEqual(arp.hwlen, 6)
self.assertEqual(arp.plen, 4)
self.assertEqual(arp.op, arp_opts["who-has"])
self.assertEqual(arp.hwsrc, smac)
self.assertEqual(arp.hwdst, "00:00:00:00:00:00")
self.assertEqual(arp.psrc, sip)
self.assertEqual(arp.pdst, dip)
def verify_ip_neighbor_config(self, af, max_number, max_age, recycle):
config = self.vapi.ip_neighbor_config_get(af)
self.assertEqual(config.af, af)
self.assertEqual(config.max_number, max_number)
self.assertEqual(config.max_age, max_age)
self.assertEqual(config.recycle, recycle)
def test_age(self):
"""Aging/Recycle"""
self.vapi.cli("set logging unthrottle 0")
self.vapi.cli("set logging size %d" % 0xFFFF)
self.pg0.generate_remote_hosts(201)
vaf = VppEnum.vl_api_address_family_t
#
# start listening on all interfaces
#
self.pg_enable_capture(self.pg_interfaces)
#
# Verify neighbor configuration defaults
#
self.verify_ip_neighbor_config(
af=vaf.ADDRESS_IP4, max_number=50000, max_age=0, recycle=False
)
#
# Set the neighbor configuration:
# limi = 200
# age = 0 seconds
# recycle = false
#
self.vapi.ip_neighbor_config(
af=vaf.ADDRESS_IP4, max_number=200, max_age=0, recycle=False
)
self.verify_ip_neighbor_config(
af=vaf.ADDRESS_IP4, max_number=200, max_age=0, recycle=False
)
self.vapi.cli("sh ip neighbor-config")
# add the 198 neighbours that should pass (-1 for one created in setup)
for ii in range(200):
VppNeighbor(
self,
self.pg0.sw_if_index,
self.pg0.remote_hosts[ii].mac,
self.pg0.remote_hosts[ii].ip4,
).add_vpp_config()
# one more neighbor over the limit should fail
with self.vapi.assert_negative_api_retval():
VppNeighbor(
self,
self.pg0.sw_if_index,
self.pg0.remote_hosts[200].mac,
self.pg0.remote_hosts[200].ip4,
).add_vpp_config()
#
# change the config to allow recycling the old neighbors
#
self.vapi.ip_neighbor_config(
af=vaf.ADDRESS_IP4, max_number=200, max_age=0, recycle=True
)
self.verify_ip_neighbor_config(
af=vaf.ADDRESS_IP4, max_number=200, max_age=0, recycle=True
)
# now new additions are allowed
VppNeighbor(
self,
self.pg0.sw_if_index,
self.pg0.remote_hosts[200].mac,
self.pg0.remote_hosts[200].ip4,
).add_vpp_config()
# add the first neighbor we configured has been re-used
self.assertFalse(
find_nbr(self, self.pg0.sw_if_index, self.pg0.remote_hosts[0].ip4)
)
self.assertTrue(
find_nbr(self, self.pg0.sw_if_index, self.pg0.remote_hosts[200].ip4)
)
#
# change the config to age old neighbors
#
self.vapi.ip_neighbor_config(
af=vaf.ADDRESS_IP4, max_number=200, max_age=2, recycle=True
)
self.verify_ip_neighbor_config(
af=vaf.ADDRESS_IP4, max_number=200, max_age=2, recycle=True
)
self.vapi.cli("sh ip4 neighbor-sorted")
# age out neighbors
self.virtual_sleep(3)
#
# expect probes from all these ARP entries as they age
# 3 probes for each neighbor 3*200 = 600
rxs = self.pg0.get_capture(600, timeout=2)
for ii in range(3):
for jj in range(200):
rx = rxs[ii * 200 + jj]
# rx.show()
#
# 3 probes sent then 1 more second to see if a reply comes, before
# they age out
#
self.virtual_sleep(1)
self.assertFalse(
self.vapi.ip_neighbor_dump(sw_if_index=0xFFFFFFFF, af=vaf.ADDRESS_IP4)
)
#
# load up some neighbours again with 2s aging enabled
# they should be removed after 10s (2s age + 4s for probes + gap)
# check for the add and remove events
#
enum = VppEnum.vl_api_ip_neighbor_event_flags_t
self.vapi.want_ip_neighbor_events_v2(enable=1)
for ii in range(10):
VppNeighbor(
self,
self.pg0.sw_if_index,
self.pg0.remote_hosts[ii].mac,
self.pg0.remote_hosts[ii].ip4,
).add_vpp_config()
e = self.vapi.wait_for_event(1, "ip_neighbor_event_v2")
self.assertEqual(e.flags, enum.IP_NEIGHBOR_API_EVENT_FLAG_ADDED)
self.assertEqual(str(e.neighbor.ip_address), self.pg0.remote_hosts[ii].ip4)
self.assertEqual(e.neighbor.mac_address, self.pg0.remote_hosts[ii].mac)
self.virtual_sleep(10)
self.assertFalse(
self.vapi.ip_neighbor_dump(sw_if_index=0xFFFFFFFF, af=vaf.ADDRESS_IP4)
)
evs = []
for ii in range(10):
e = self.vapi.wait_for_event(1, "ip_neighbor_event_v2")
self.assertEqual(e.flags, enum.IP_NEIGHBOR_API_EVENT_FLAG_REMOVED)
evs.append(e)
# check we got the correct mac/ip pairs - done separately
# because we don't care about the order the remove notifications
# arrive
for ii in range(10):
found = False
mac = self.pg0.remote_hosts[ii].mac
ip = self.pg0.remote_hosts[ii].ip4
for e in evs:
if e.neighbor.mac_address == mac and str(e.neighbor.ip_address) == ip:
found = True
break
self.assertTrue(found)
#
# check if we can set age and recycle with empty neighbor list
#
self.vapi.ip_neighbor_config(
af=vaf.ADDRESS_IP4, max_number=200, max_age=1000, recycle=True
)
self.verify_ip_neighbor_config(
af=vaf.ADDRESS_IP4, max_number=200, max_age=1000, recycle=True
)
#
# load up some neighbours again, then disable the aging
# they should still be there in 10 seconds time
#
for ii in range(10):
VppNeighbor(
self,
self.pg0.sw_if_index,
self.pg0.remote_hosts[ii].mac,
self.pg0.remote_hosts[ii].ip4,
).add_vpp_config()
self.vapi.ip_neighbor_config(
af=vaf.ADDRESS_IP4, max_number=200, max_age=0, recycle=False
)
self.verify_ip_neighbor_config(
af=vaf.ADDRESS_IP4, max_number=200, max_age=0, recycle=False
)
self.virtual_sleep(10)
self.assertTrue(
find_nbr(self, self.pg0.sw_if_index, self.pg0.remote_hosts[0].ip4)
)
class NeighborReplaceTestCase(VppTestCase):
"""ARP/ND Replacement"""
@classmethod
def setUpClass(cls):
super(NeighborReplaceTestCase, cls).setUpClass()
@classmethod
def tearDownClass(cls):
super(NeighborReplaceTestCase, cls).tearDownClass()
def setUp(self):
super(NeighborReplaceTestCase, self).setUp()
self.create_pg_interfaces(range(4))
# pg0 configured with ip4 and 6 addresses used for input
# pg1 configured with ip4 and 6 addresses used for output
# pg2 is unnumbered to pg0
for i in self.pg_interfaces:
i.admin_up()
i.config_ip4()
i.config_ip6()
i.resolve_arp()
i.resolve_ndp()
def tearDown(self):
super(NeighborReplaceTestCase, self).tearDown()
for i in self.pg_interfaces:
i.unconfig_ip4()
i.unconfig_ip6()
i.admin_down()
def test_replace(self):
"""replace"""
N_HOSTS = 16
for i in self.pg_interfaces:
i.generate_remote_hosts(N_HOSTS)
i.configure_ipv4_neighbors()
i.configure_ipv6_neighbors()
# replace them all
self.vapi.ip_neighbor_replace_begin()
self.vapi.ip_neighbor_replace_end()
for i in self.pg_interfaces:
for h in range(N_HOSTS):
self.assertFalse(
find_nbr(self, self.pg0.sw_if_index, self.pg0.remote_hosts[h].ip4)
)
self.assertFalse(
find_nbr(self, self.pg0.sw_if_index, self.pg0.remote_hosts[h].ip6)
)
#
# and them all back via the API
#
for i in self.pg_interfaces:
for h in range(N_HOSTS):
VppNeighbor(
self, i.sw_if_index, i.remote_hosts[h].mac, i.remote_hosts[h].ip4
).add_vpp_config()
VppNeighbor(
self, i.sw_if_index, i.remote_hosts[h].mac, i.remote_hosts[h].ip6
).add_vpp_config()
#
# begin the replacement again, this time touch some
# the neighbours on pg1 so they are not deleted
#
self.vapi.ip_neighbor_replace_begin()
# update from the API all neighbours on pg1
for h in range(N_HOSTS):
VppNeighbor(
self,
self.pg1.sw_if_index,
self.pg1.remote_hosts[h].mac,
self.pg1.remote_hosts[h].ip4,
).add_vpp_config()
VppNeighbor(
self,
self.pg1.sw_if_index,
self.pg1.remote_hosts[h].mac,
self.pg1.remote_hosts[h].ip6,
).add_vpp_config()
# update from the data-plane all neighbours on pg3
self.pg3.configure_ipv4_neighbors()
self.pg3.configure_ipv6_neighbors()
# complete the replacement
self.logger.info(self.vapi.cli("sh ip neighbors"))
self.vapi.ip_neighbor_replace_end()
for i in self.pg_interfaces:
if i == self.pg1 or i == self.pg3:
# neighbours on pg1 and pg3 are still present
for h in range(N_HOSTS):
self.assertTrue(
find_nbr(self, i.sw_if_index, i.remote_hosts[h].ip4)
)
self.assertTrue(
find_nbr(self, i.sw_if_index, i.remote_hosts[h].ip6)
)
else:
# all other neighbours are toast
for h in range(N_HOSTS):
self.assertFalse(
find_nbr(self, i.sw_if_index, i.remote_hosts[h].ip4)
)
self.assertFalse(
find_nbr(self, i.sw_if_index, i.remote_hosts[h].ip6)
)
class NeighborFlush(VppTestCase):
"""Neighbor Flush"""
@classmethod
def setUpClass(cls):
super(NeighborFlush, cls).setUpClass()
@classmethod
def tearDownClass(cls):
super(NeighborFlush, cls).tearDownClass()
def setUp(self):
super(NeighborFlush, self).setUp()
self.create_pg_interfaces(range(2))
for i in self.pg_interfaces:
i.admin_up()
i.config_ip4()
i.config_ip6()
i.resolve_arp()
i.resolve_ndp()
def tearDown(self):
super(NeighborFlush, self).tearDown()
for i in self.pg_interfaces:
i.unconfig_ip4()
i.unconfig_ip6()
i.admin_down()
def test_flush(self):
"""Neighbour Flush"""
e = VppEnum
nf = e.vl_api_ip_neighbor_flags_t
af = e.vl_api_address_family_t
N_HOSTS = 16
static = [False, True]
self.pg0.generate_remote_hosts(N_HOSTS)
self.pg1.generate_remote_hosts(N_HOSTS)
for s in static:
# a few v4 and v6 dynamic neoghbors
for n in range(N_HOSTS):
VppNeighbor(
self,
self.pg0.sw_if_index,
self.pg0.remote_hosts[n].mac,
self.pg0.remote_hosts[n].ip4,
is_static=s,
).add_vpp_config()
VppNeighbor(
self,
self.pg1.sw_if_index,
self.pg1.remote_hosts[n].mac,
self.pg1.remote_hosts[n].ip6,
is_static=s,
).add_vpp_config()
# flush the interfaces individually
self.vapi.ip_neighbor_flush(af.ADDRESS_IP4, self.pg0.sw_if_index)
# check we haven't flushed that which we shouldn't
for n in range(N_HOSTS):
self.assertTrue(
find_nbr(
self,
self.pg1.sw_if_index,
self.pg1.remote_hosts[n].ip6,
is_static=s,
)
)
self.vapi.ip_neighbor_flush(af.ADDRESS_IP6, self.pg1.sw_if_index)
for n in range(N_HOSTS):
self.assertFalse(
find_nbr(self, self.pg0.sw_if_index, self.pg0.remote_hosts[n].ip4)
)
self.assertFalse(
find_nbr(self, self.pg1.sw_if_index, self.pg1.remote_hosts[n].ip6)
)
# add the nieghbours back
for n in range(N_HOSTS):
VppNeighbor(
self,
self.pg0.sw_if_index,
self.pg0.remote_hosts[n].mac,
self.pg0.remote_hosts[n].ip4,
is_static=s,
).add_vpp_config()
VppNeighbor(
self,
self.pg1.sw_if_index,
self.pg1.remote_hosts[n].mac,
self.pg1.remote_hosts[n].ip6,
is_static=s,
).add_vpp_config()
self.logger.info(self.vapi.cli("sh ip neighbor"))
# flush both interfaces at the same time
self.vapi.ip_neighbor_flush(af.ADDRESS_IP6, 0xFFFFFFFF)
# check we haven't flushed that which we shouldn't
for n in range(N_HOSTS):
self.assertTrue(
find_nbr(
self,
self.pg0.sw_if_index,
self.pg0.remote_hosts[n].ip4,
is_static=s,
)
)
self.vapi.ip_neighbor_flush(af.ADDRESS_IP4, 0xFFFFFFFF)
for n in range(N_HOSTS):
self.assertFalse(
find_nbr(self, self.pg0.sw_if_index, self.pg0.remote_hosts[n].ip4)
)
self.assertFalse(
find_nbr(self, self.pg1.sw_if_index, self.pg1.remote_hosts[n].ip6)
)
if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)