npt66: icmp6 alg to handle icmp6 error messages

Support rewriting the inner packet for ICMP6 error messages.

Type: feature
Change-Id: I7e11f53626037075a23310f1cb7e673b0cb52843
Signed-off-by: Ole Troan <otroan@employees.org>
This commit is contained in:
Ole Troan
2023-10-12 18:54:55 +02:00
committed by Ole Tr�an
parent 1fe132ec1a
commit ff344a98af
2 changed files with 131 additions and 18 deletions

View File

@ -159,6 +159,69 @@ done:
return rv;
}
static int
npt66_icmp6_translate (vlib_buffer_t *b, ip6_header_t *outer_ip,
icmp46_header_t *icmp, npt66_binding_t *binding,
int dir)
{
ip6_header_t *ip = (ip6_header_t *) (icmp + 2);
int rv = 0;
vlib_main_t *vm = vlib_get_main ();
if (clib_net_to_host_u16 (outer_ip->payload_length) <
sizeof (icmp46_header_t) + 4 + sizeof (ip6_header_t))
{
clib_warning ("ICMP6 payload too short");
return -1;
}
// Validate checksums
int bogus_length;
u16 sum16;
sum16 = ip6_tcp_udp_icmp_compute_checksum (vm, b, outer_ip, &bogus_length);
if (sum16 != 0 && sum16 != 0xffff)
{
clib_warning ("ICMP6 checksum failed");
return -1;
}
if (dir == VLIB_RX)
{
if (!ip6_prefix_cmp (ip->src_address, binding->external,
binding->external_plen))
{
clib_warning (
"npt66_icmp6_translate: src address is not internal (%U -> %U)",
format_ip6_address, &ip->src_address, format_ip6_address,
&ip->dst_address);
goto done;
}
ip->src_address = ip6_prefix_copy (ip->src_address, binding->internal,
binding->internal_plen);
/* Checksum neutrality */
rv = npt66_adjust_checksum (binding->internal_plen, true, binding->delta,
&ip->src_address);
}
else
{
if (!ip6_prefix_cmp (ip->dst_address, binding->external,
binding->external_plen))
{
clib_warning (
"npt66_icmp6_translate: dst address is not external (%U -> %U)",
format_ip6_address, &ip->src_address, format_ip6_address,
&ip->dst_address);
goto done;
}
ip->dst_address = ip6_prefix_copy (ip->dst_address, binding->internal,
binding->internal_plen);
rv = npt66_adjust_checksum (binding->internal_plen, false,
binding->delta, &ip->dst_address);
}
done:
return rv;
}
/*
* Lookup the packet tuple in the flow cache, given the lookup mask.
* If a binding is found, rewrite the packet according to instructions,
@ -194,8 +257,20 @@ npt66_node_inline (vlib_main_t *vm, vlib_node_runtime_t *node,
/* By default pass packet to next node in the feature chain */
vnet_feature_next_u16 (next, b[0]);
int rv;
icmp46_header_t *icmp = (icmp46_header_t *) (ip + 1);
if (ip->protocol == IP_PROTOCOL_ICMP6 && icmp->type < 128)
{
rv = npt66_icmp6_translate (b[0], ip, icmp, binding, dir);
if (rv < 0)
{
clib_warning ("ICMP6 npt66_translate failed");
*next = NPT66_NEXT_DROP;
goto next;
}
}
rv = npt66_translate (ip, binding, dir);
int rv = npt66_translate (ip, binding, dir);
if (rv < 0)
{
vlib_node_increment_counter (vm, node->node_index,

View File

@ -4,7 +4,7 @@ import unittest
import ipaddress
from framework import VppTestCase, VppTestRunner
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6DestUnreach
from scapy.layers.l2 import Ether
from scapy.packet import Raw
@ -33,7 +33,7 @@ class TestNPT66(VppTestCase):
i.admin_down()
super(TestNPT66, self).tearDown()
def send_and_verify(self, internal):
def send_and_verify(self, internal, reply_icmp_error=False):
sendif = self.pg0
recvif = self.pg1
local_mac = self.pg0.local_mac
@ -47,30 +47,57 @@ class TestNPT66(VppTestCase):
/ ICMPv6EchoRequest()
/ Raw(b"Request")
)
# print('Sending packet')
# p.show2()
rxs = self.send_and_expect(sendif, p, recvif)
for rx in rxs:
# print('Received packet')
# rx.show2()
original_cksum = rx[ICMPv6EchoRequest].cksum
del rx[ICMPv6EchoRequest].cksum
rx = rx.__class__(bytes(rx))
self.assertEqual(original_cksum, rx[ICMPv6EchoRequest].cksum)
# Generate a replies
reply = (
Ether(dst=rx[Ether].src, src=local_mac)
/ IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
/ ICMPv6EchoRequest()
/ Raw(b"Reply")
)
if reply_icmp_error:
# print('Generating an ICMP error message')
reply = (
Ether(dst=rx[Ether].src, src=local_mac)
/ IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
/ ICMPv6DestUnreach()
/ rx[IPv6]
)
# print('Sending ICMP error message reply')
# reply.show2()
replies = self.send_and_expect(recvif, reply, sendif)
for r in replies:
# print('Received ICMP error message reply on the other side')
# r.show2()
self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
original_cksum = r[ICMPv6EchoRequest].cksum
del r[ICMPv6EchoRequest].cksum
r = r.__class__(bytes(r))
self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
replies = self.send_and_expect(recvif, reply, sendif)
for r in replies:
self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
original_cksum = r[ICMPv6EchoRequest].cksum
del r[ICMPv6EchoRequest].cksum
r = r.__class__(bytes(r))
self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
else:
reply = (
Ether(dst=rx[Ether].src, src=local_mac)
/ IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
/ ICMPv6EchoRequest()
/ Raw(b"Reply")
)
def do_test(self, internal, external):
replies = self.send_and_expect(recvif, reply, sendif)
for r in replies:
r.show2()
self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
original_cksum = r[ICMPv6EchoRequest].cksum
del r[ICMPv6EchoRequest].cksum
r = r.__class__(bytes(r))
self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
def do_test(self, internal, external, reply_icmp_error=False):
"""Add NPT66 binding and send packet"""
self.vapi.npt66_binding_add_del(
sw_if_index=self.pg1.sw_if_index,
internal=internal,
@ -80,7 +107,7 @@ class TestNPT66(VppTestCase):
## TODO use route api
self.vapi.cli(f"ip route add {internal} via {self.pg0.remote_ip6}")
self.send_and_verify(internal)
self.send_and_verify(internal, reply_icmp_error=reply_icmp_error)
self.vapi.npt66_binding_add_del(
sw_if_index=self.pg1.sw_if_index,
@ -97,6 +124,17 @@ class TestNPT66(VppTestCase):
self.do_test("fc00:1234::/32", "2001:db8:1::/32")
self.do_test("fc00:1234::/63", "2001:db8:1::/56")
def test_npt66_icmp6(self):
"""Send and receive a packet through NPT66"""
# Test ICMP6 error packets
self.do_test(
"fd00:0000:0000::/48", "2001:4650:c3ed::/48", reply_icmp_error=True
)
self.do_test("fc00:1::/48", "2001:db8:1::/48", reply_icmp_error=True)
self.do_test("fc00:1234::/32", "2001:db8:1::/32", reply_icmp_error=True)
self.do_test("fc00:1234::/63", "2001:db8:1::/56", reply_icmp_error=True)
if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)