vpp/test/test_urpf.py
Pim van Pelt 2fa69effc8 urpf: add interface dump to API
Add an urpf_interface_dump() API call, with optional sw_if_index.
If either a mode or a table is specified in any given interface
address family and direction, return it in a list, otherwise omit
it.

TESTED:
create loopback interface instance 0
create loopback interface instance 1
create loopback interface instance 2
create loopback interface instance 3
ip6 table add 8298
set urpf ip4 rx loose loop1
set urpf ip6 tx off loop2 table 8298

API call urpf_interface_dump(sw_if_index=~1) returns:
[
urpf_interface_details(_0=658, context=2, sw_if_index=2, is_rx=True, mode=<vl_api_urpf_mode_t.URPF_API_MODE_LOOSE: 1>, af=<vl_api_address_family_t.ADDRESS_IP4: 0>, table_id=0),
urpf_interface_details(_0=658, context=2, sw_if_index=3, is_rx=False, mode=<vl_api_urpf_mode_t.URPF_API_MODE_OFF: 0>, af=<vl_api_address_family_t.ADDRESS_IP6: 1>, table_id=8298)
]

Type: improvement
Change-Id: I1ded5c445dc07dab73ea41b817b5827b72ca79d4
Signed-off-by: pim@ipng.nl
2024-01-10 00:28:06 +00:00

378 lines
13 KiB
Python

#!/usr/bin/env python3
import unittest
from framework import VppTestCase
from asfframework import VppTestRunner
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP
from scapy.layers.inet6 import IPv6
from vpp_papi import VppEnum
N_PKTS = 63
class TestURPF(VppTestCase):
"""Unicast Reverse Path Forwarding Test Case"""
@classmethod
def setUpClass(cls):
super(TestURPF, cls).setUpClass()
@classmethod
def tearDownClass(cls):
super(TestURPF, cls).tearDownClass()
def setUp(self):
super(TestURPF, self).setUp()
# create 4 pg interfaces so there are a few addresses
# in the FIB
self.create_pg_interfaces(range(4))
for i in self.pg_interfaces:
i.admin_up()
i.config_ip4()
i.resolve_arp()
i.config_ip6()
i.resolve_ndp()
def tearDown(self):
for i in self.pg_interfaces:
i.unconfig_ip4()
i.unconfig_ip6()
i.admin_down()
super(TestURPF, self).tearDown()
def test_urpf4(self):
"""uRPF IP4"""
e = VppEnum
p_spoof_loose = (
Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
/ IP(src="3.3.3.3", dst=self.pg1.remote_ip4)
/ UDP(sport=1234, dport=1234)
/ Raw(b"\xa5" * 100)
) * N_PKTS
p_spoof_strict = (
Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
/ IP(src=self.pg2.remote_ip4, dst=self.pg1.remote_ip4)
/ UDP(sport=1234, dport=1234)
/ Raw(b"\xa5" * 100)
) * N_PKTS
p_good = (
Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
/ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
/ UDP(sport=1234, dport=1234)
/ Raw(b"\xa5" * 100)
) * N_PKTS
#
# before adding the uRPF, ensure all packets are forwarded
#
self.send_and_expect(self.pg0, p_good, self.pg1)
self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
self.send_and_expect(self.pg0, p_spoof_loose, self.pg1)
#
# apply loose uRPF check on pg0 rx
#
self.vapi.urpf_update(
is_input=True,
mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
af=e.vl_api_address_family_t.ADDRESS_IP4,
sw_if_index=self.pg0.sw_if_index,
)
# good packets still pass
self.send_and_expect(self.pg0, p_good, self.pg1)
# packets from address for which there is a route are forwarded
self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
# packets from address to which there is no route are dropped
self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
self.assert_error_counter_equal("/err/ip4-rx-urpf-loose/uRPF Drop", N_PKTS)
#
# crank it up to strict mode
#
self.vapi.urpf_update(
is_input=True,
mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
af=e.vl_api_address_family_t.ADDRESS_IP4,
sw_if_index=self.pg0.sw_if_index,
)
# good packets still pass
self.send_and_expect(self.pg0, p_good, self.pg1)
# packets that would not be routed back thru pg0 are dropped
self.send_and_assert_no_replies(self.pg0, p_spoof_strict)
self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
self.assert_error_counter_equal("/err/ip4-rx-urpf-strict/uRPF Drop", 2 * N_PKTS)
#
# disable uRPF, all traffic should pass
#
self.vapi.urpf_update(
is_input=True,
mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
af=e.vl_api_address_family_t.ADDRESS_IP4,
sw_if_index=self.pg0.sw_if_index,
)
self.send_and_expect(self.pg0, p_good, self.pg1)
self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
self.send_and_expect(self.pg0, p_spoof_loose, self.pg1)
#
# Now apply in the TX direction
# for loose it is the same deal, they should not be forwarded
# if there's no route
# for strict they should not be forwarded if they would be
# forwarded thru that interface.
#
self.vapi.urpf_update(
is_input=False,
mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
af=e.vl_api_address_family_t.ADDRESS_IP4,
sw_if_index=self.pg1.sw_if_index,
)
self.send_and_expect(self.pg0, p_good, self.pg1)
self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
self.assert_error_counter_equal("/err/ip4-tx-urpf-loose/uRPF Drop", N_PKTS)
self.vapi.urpf_update(
is_input=False,
mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
af=e.vl_api_address_family_t.ADDRESS_IP4,
sw_if_index=self.pg1.sw_if_index,
)
self.send_and_expect(self.pg0, p_good, self.pg1)
# the strict packet, from a peer is allowed, since it does
# not forward via pg1
self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
self.assert_error_counter_equal("/err/ip4-tx-urpf-strict/uRPF Drop", N_PKTS)
# change the strict packet so that it would forward through pg1
p_spoof_strict = (
Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
/ IP(src=self.pg1.remote_ip4, dst=self.pg1.remote_ip4)
/ UDP(sport=1234, dport=1234)
/ Raw(b"\xa5" * 100)
) * N_PKTS
self.send_and_assert_no_replies(self.pg0, p_spoof_strict)
self.assert_error_counter_equal("/err/ip4-tx-urpf-strict/uRPF Drop", 2 * N_PKTS)
# cleanup
self.vapi.urpf_update(
is_input=False,
mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
af=e.vl_api_address_family_t.ADDRESS_IP4,
sw_if_index=self.pg1.sw_if_index,
)
def test_urpf6(self):
"""uRPF IP6"""
e = VppEnum
p_spoof_loose = (
Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
/ IPv6(src="3::3", dst=self.pg1.remote_ip6)
/ UDP(sport=1236, dport=1236)
/ Raw(b"\xa5" * 100)
) * N_PKTS
p_spoof_strict = (
Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
/ IPv6(src=self.pg2.remote_ip6, dst=self.pg1.remote_ip6)
/ UDP(sport=1236, dport=1236)
/ Raw(b"\xa5" * 100)
) * N_PKTS
p_good = (
Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
/ IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
/ UDP(sport=1236, dport=1236)
/ Raw(b"\xa5" * 100)
) * N_PKTS
#
# before adding the uRPF, ensure all packets are forwarded
#
self.send_and_expect(self.pg0, p_good, self.pg1)
self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
self.send_and_expect(self.pg0, p_spoof_loose, self.pg1)
#
# apply loose uRPF check on pg0 rx
#
self.vapi.urpf_update(
is_input=True,
mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
af=e.vl_api_address_family_t.ADDRESS_IP6,
sw_if_index=self.pg0.sw_if_index,
)
# good packets still pass
self.send_and_expect(self.pg0, p_good, self.pg1)
# packets from address for which there is a route are forwarded
self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
# packets from address to which there is no route are dropped
self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
self.assert_error_counter_equal("/err/ip6-rx-urpf-loose/uRPF Drop", N_PKTS)
#
# crank it up to strict mode
#
self.vapi.urpf_update(
is_input=True,
mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
af=e.vl_api_address_family_t.ADDRESS_IP6,
sw_if_index=self.pg0.sw_if_index,
)
# good packets still pass
self.send_and_expect(self.pg0, p_good, self.pg1)
# packets that would not be routed back thru pg0 are dropped
self.send_and_assert_no_replies(self.pg0, p_spoof_strict)
self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
self.assert_error_counter_equal("/err/ip6-rx-urpf-strict/uRPF Drop", 2 * N_PKTS)
#
# disable uRPF, all traffic should pass
#
self.vapi.urpf_update(
is_input=True,
mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
af=e.vl_api_address_family_t.ADDRESS_IP6,
sw_if_index=self.pg0.sw_if_index,
)
self.send_and_expect(self.pg0, p_good, self.pg1)
self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
self.send_and_expect(self.pg0, p_spoof_loose, self.pg1)
#
# Now apply in the TX direction
# for loose it is the same deal, they should not be forwarded
# if there's no route
# for strict they should not be forwarded if they would be
# forwarded thru that interface.
#
self.vapi.urpf_update(
is_input=False,
mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
af=e.vl_api_address_family_t.ADDRESS_IP6,
sw_if_index=self.pg1.sw_if_index,
)
self.send_and_expect(self.pg0, p_good, self.pg1)
self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
self.assert_error_counter_equal("/err/ip6-tx-urpf-loose/uRPF Drop", N_PKTS)
self.vapi.urpf_update(
is_input=False,
mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
af=e.vl_api_address_family_t.ADDRESS_IP6,
sw_if_index=self.pg1.sw_if_index,
)
self.send_and_expect(self.pg0, p_good, self.pg1)
# the strict packet, from a peer is allowed, since it does
# not forward via pg1
self.send_and_expect(self.pg0, p_spoof_strict, self.pg1)
self.send_and_assert_no_replies(self.pg0, p_spoof_loose)
self.assert_error_counter_equal("/err/ip6-tx-urpf-strict/uRPF Drop", N_PKTS)
# change the strict packet so that it would forward through pg1
p_spoof_strict = (
Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
/ IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6)
/ UDP(sport=1236, dport=1236)
/ Raw(b"\xa5" * 100)
) * N_PKTS
self.send_and_assert_no_replies(self.pg0, p_spoof_strict)
self.assert_error_counter_equal("/err/ip6-tx-urpf-strict/uRPF Drop", 2 * N_PKTS)
# cleanup
self.vapi.urpf_update(
is_input=False,
mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
af=e.vl_api_address_family_t.ADDRESS_IP6,
sw_if_index=self.pg1.sw_if_index,
)
def test_interface_dump(self):
"""uRPF Interface Dump"""
self.create_loopback_interfaces(3)
e = VppEnum
self.vapi.urpf_update(
is_input=True,
mode=e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT,
af=e.vl_api_address_family_t.ADDRESS_IP4,
sw_if_index=self.loop1.sw_if_index,
)
self.vapi.urpf_update(
is_input=False,
mode=e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE,
af=e.vl_api_address_family_t.ADDRESS_IP6,
sw_if_index=self.loop2.sw_if_index,
)
ret = self.vapi.urpf_interface_dump()
self.assertEqual(len(ret), 2)
dump_loop1 = ret[0]
dump_loop2 = ret[1]
self.assertEqual(dump_loop1.sw_if_index, self.loop1.sw_if_index)
self.assertTrue(dump_loop1.is_input)
self.assertEqual(dump_loop1.mode, e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT)
self.assertEqual(dump_loop1.af, e.vl_api_address_family_t.ADDRESS_IP4)
self.assertEqual(dump_loop2.sw_if_index, self.loop2.sw_if_index)
self.assertFalse(dump_loop2.is_input)
self.assertEqual(dump_loop2.mode, e.vl_api_urpf_mode_t.URPF_API_MODE_LOOSE)
self.assertEqual(dump_loop2.af, e.vl_api_address_family_t.ADDRESS_IP6)
ret = self.vapi.urpf_interface_dump(sw_if_index=self.loop1.sw_if_index)
self.assertEqual(len(ret), 1)
dump_loop1 = ret[0]
self.assertEqual(dump_loop1.sw_if_index, self.loop1.sw_if_index)
self.assertTrue(dump_loop1.is_input)
self.assertEqual(dump_loop1.mode, e.vl_api_urpf_mode_t.URPF_API_MODE_STRICT)
self.assertEqual(dump_loop1.af, e.vl_api_address_family_t.ADDRESS_IP4)
# cleanup
self.vapi.urpf_update(
is_input=False,
mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
af=e.vl_api_address_family_t.ADDRESS_IP4,
sw_if_index=self.loop1.sw_if_index,
)
self.vapi.urpf_update(
is_input=False,
mode=e.vl_api_urpf_mode_t.URPF_API_MODE_OFF,
af=e.vl_api_address_family_t.ADDRESS_IP6,
sw_if_index=self.loop2.sw_if_index,
)
if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)