vpp/test/test_pvti.py
Andrew Yourtchenko 0acb398d6d pvti: Packet Vector Tunnel Interface
This plugin implements a PoC of UDP-based tunnel substrate whose aim is
to specifically provide higher MTU to the upper layers by chunking
the payload PDUs into smaller packets with full 5-tuple.

At the same time, if there are multiple small packets to
the same destination during the vector processing, they
are packed into "carrier" packets up to underlay MTU size.

It does assume a trustworthy underlying medium, thus for the
operation over Internet it requires the use of encryption layer
underneath.

Type: feature
Change-Id: I323958fa8de62584f6ed15643ea689568a9a62bc
Signed-off-by: Andrew Yourtchenko <ayourtch@gmail.com>
2024-09-17 12:10:19 +00:00

1154 lines
38 KiB
Python

#!/usr/bin/env python3
""" PVTI tests """
import datetime
import base64
import os
import copy
import struct
from hashlib import blake2s
from config import config
from scapy.packet import Raw
from scapy.compat import raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP
from scapy.layers.inet6 import IPv6
from scapy.layers.vxlan import VXLAN
from vpp_interface import VppInterface
from vpp_pg_interface import is_ipv6_misc
from vpp_ip_route import VppIpRoute, VppRoutePath
from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
from vpp_vxlan_tunnel import VppVxlanTunnel
from vpp_object import VppObject
from vpp_papi import VppEnum
from asfframework import tag_run_solo, tag_fixme_vpp_debug
from framework import VppTestCase
from re import compile
import unittest
from scapy.packet import Packet, bind_layers
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP
from scapy.layers.inet6 import IPv6
from scapy.fields import (
FlagsField,
XByteField,
XShortField,
ThreeBytesField,
ConditionalField,
ShortField,
ByteEnumField,
X3BytesField,
LEIntField,
ByteField,
StrLenField,
PacketListField,
LEShortField,
IntField,
ShortField,
XIntField,
)
import sys
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
#
# A custom decoder for Scapy for PVTI packet format
#
class PVTIChunk(Packet):
name = "PVTIChunk"
fields_desc = [
ShortField("total_chunk_length", None),
XShortField("_pad0", 0),
XIntField("_pad1", 0),
StrLenField("data", "", length_from=lambda pkt: pkt.total_chunk_length - 8),
]
# This prevents the first chunk from consuming the entire remaining
# contents of the packet
def extract_padding(self, s):
return "", s
def post_build(self, p, pay):
if self.total_chunk_length is None and self.data:
chunk_header_size = 8
l = chunk_header_size + len(self.data)
p = struct.pack("!H", l) + p[2:]
return p + pay
class PVTI(Packet):
name = "PVTI"
PVTI_ALIGN_BYTES = 9
fields_desc = [
IntField("seq", 0x0),
ByteField("stream_index", 0),
ByteField("chunk_count", None),
ByteField("reass_chunk_count", 0),
ByteField("mandatory_flags_mask", 0),
ByteField("flags_value", 0),
ByteField("pad_bytes", PVTI_ALIGN_BYTES),
StrLenField(
"pad", b"\xca" * PVTI_ALIGN_BYTES, length_from=lambda pkt: pkt.pad_bytes
),
PacketListField("chunks", [], PVTIChunk, count_from=lambda p: p.chunk_count),
]
def mysummary(self):
return self.sprintf("PVTI (len=%PVTI.total_len%)")
def post_build(self, p, pay):
if self.chunk_count is None:
l = len(self.chunks)
# offset of the chunk count within the fields
offset_of_chunk_count = 5
p = (
p[:offset_of_chunk_count]
+ struct.pack("b", l)
+ p[offset_of_chunk_count + 1 :]
)
return p + pay
bind_layers(UDP, PVTI, dport=12312)
# By default, set both ports to the test
# bind_layers(UDP, PVTI, sport=6192, dport=6192)
# PVTI ENcapsulator/DEcapsulator
class PvtiEnDe(object):
"""
PVTI encapsulator/decapsulator
"""
def __init__(
self,
local_ip,
local_port,
remote_ip,
remote_port,
underlay_mtu=1500,
for_rx_test=False,
):
self.for_rx_test = for_rx_test
self.local_ip = local_ip
self.local_port = local_port
self.remote_ip = remote_ip
self.remote_port = remote_port
self.underlay_mtu = underlay_mtu
self.stream_index = 0
self.tx_chunks = []
self.tx_n_reass_chunks = 0
self.tx_seq = 42
# payload = chunk headers + data
self.max_payload_len = underlay_mtu - len(raw(IP() / UDP() / PVTI()))
self.pvti_header_len = len(raw(PVTI()))
self.chunk_header_len = len(raw(PVTIChunk()))
def get_curr_payload_len(self):
tx_len = 0
for c in self.tx_chunks:
tx_len = tx_len + len(c.data) + self.chunk_header_len
return tx_len
def get_payload_room(self):
return self.max_payload_len - self.get_curr_payload_len()
def flush_tx_chunks(self, more_frags=False):
if self.for_rx_test:
ip_dst = self.local_ip
ip_src = self.remote_ip
else:
ip_src = self.local_ip
ip_dst = self.remote_ip
p = (
IP(
src=ip_src,
dst=ip_dst,
ttl=127,
frag=0,
flags=0,
id=self.tx_seq,
)
/ UDP(sport=self.local_port, dport=self.remote_port, chksum=0)
/ PVTI(
reass_chunk_count=self.tx_n_reass_chunks,
seq=self.tx_seq,
stream_index=self.stream_index,
chunks=self.tx_chunks,
)
)
p = IP(raw(p))
self.tx_n_reass_chunks = 0
self.tx_chunks = []
self.tx_seq = self.tx_seq + 1
return p
def encap_pkt(self, p):
out = []
if IP in p:
p[IP].ttl = p[IP].ttl - 1
payload_wip = p[IP].build()
elif IPv6 in p:
p[IPv6].hlim = p[IPv6].hlim - 1
payload_wip = p[IPv6].build()
split_chunks = False
huge_solo_packet = (
len(payload_wip) + self.chunk_header_len > self.get_payload_room()
) and len(self.tx_chunks) == 0
while True:
available_room = self.get_payload_room()
chunk_wip_len = len(payload_wip) + self.chunk_header_len
xpad0 = 0xABAB
xpad1 = 0xABABABAB
if chunk_wip_len <= available_room:
# happy case - there is enough space to fit the entire chunk
if split_chunks:
self.tx_n_reass_chunks = self.tx_n_reass_chunks + 1
tx = PVTIChunk(data=payload_wip, _pad0=xpad0, _pad1=xpad1)
self.tx_chunks.append(tx)
if chunk_wip_len == available_room:
# an unlikely perfect fit - send this packet.
out.append(self.flush_tx_chunks())
break
elif available_room < self.chunk_header_len + 1:
# Can not fit even a chunk header + 1 byte of data
# Flush and retry
out.append(self.flush_tx_chunks())
continue
else:
# Chop as much as we can from the packet
chop_len = available_room - self.chunk_header_len
if split_chunks:
self.tx_n_reass_chunks = self.tx_n_reass_chunks + 1
tx = PVTIChunk(data=payload_wip[:chop_len], _pad0=xpad0, _pad1=xpad1)
self.tx_chunks.append(tx)
out.append(self.flush_tx_chunks())
split_chunks = True
payload_wip = payload_wip[chop_len:]
continue
return out
def encap_packets(self, pkts):
out = []
self.start_encap()
for p in pkts:
out.extend(self.encap_pkt(p))
last_pkt = self.finish_encap()
if last_pkt != None:
out.append(last_pkt)
return out
def start_encap(self):
return None
def finish_encap(self):
out = None
if len(self.tx_chunks) > 0:
out = self.flush_tx_chunks()
return out
""" TestPvti is a subclass of VPPTestCase classes.
PVTI test.
"""
def get_field_bytes(pkt, name):
fld, val = pkt.getfield_and_val(name)
return fld.i2m(pkt, val)
class VppPvtiInterface(VppInterface):
"""
VPP PVTI interface
"""
def __init__(
self, test, local_ip, local_port, remote_ip, remote_port, underlay_mtu=1500
):
super(VppPvtiInterface, self).__init__(test)
self.local_ip = local_ip
self.local_port = local_port
self.remote_ip = remote_ip
self.remote_port = remote_port
self.underlay_mtu = underlay_mtu
def get_ende(self, for_rx_test=False):
return PvtiEnDe(
self.local_ip,
self.local_port,
self.remote_ip,
self.remote_port,
self.underlay_mtu,
for_rx_test,
)
def verify_encap_packets(self, orig_pkts, recv_pkts):
ende = self.get_ende()
recv2_pkts = ende.encap_packets(orig_pkts)
out1 = []
out2 = []
for i, pkt in enumerate(recv_pkts):
if IP in pkt:
rx_pkt = pkt[IP]
elif IPv6 in pkt:
rx_pkt = pkt[IPv6]
else:
raise "Neither IPv4 nor IPv6"
py_pkt = recv2_pkts[i]
if rx_pkt != py_pkt:
eprint("received packet:")
rx_pkt.show()
eprint("python packet:")
py_pkt.show()
out1.append(rx_pkt)
out2.append(py_pkt)
return (out1, out2)
def add_vpp_config(self):
r = self.test.vapi.pvti_interface_create(
interface={
"local_ip": self.local_ip,
"local_port": self.local_port,
"remote_ip": self.remote_ip,
"remote_port": self.remote_port,
"underlay_mtu": self.underlay_mtu,
}
)
self.set_sw_if_index(r.sw_if_index)
self.test.registry.register(self, self.test.logger)
return self
def remove_vpp_config(self):
self.test.vapi.pvti_interface_delete(sw_if_index=self._sw_if_index)
def query_vpp_config(self):
ts = self.test.vapi.pvti_interface_dump(sw_if_index=0xFFFFFFFF)
for t in ts:
if (
t.interface.sw_if_index == self._sw_if_index
and str(t.interface.local_ip) == self.local_ip
and t.interface.local_port == self.local_port
and t.interface.remote_port == self.remote_port
and str(t.interface.remote_ip) == self.remote_ip
):
self.test.logger.info("QUERY AYXX: true")
return True
return False
def __str__(self):
return self.object_id()
def object_id(self):
return "pvti-%d" % self._sw_if_index
@unittest.skipIf("pvti" in config.excluded_plugins, "Exclude PVTI plugin tests")
# @tag_run_solo
class TestPvti(VppTestCase):
"""Packet Vector Tunnel Interface (PVTI) Test Case"""
error_str = compile(r"Error")
# maxDiff = None
wg4_output_node_name = "/err/wg4-output-tun/"
wg4_input_node_name = "/err/wg4-input/"
wg6_output_node_name = "/err/wg6-output-tun/"
wg6_input_node_name = "/err/wg6-input/"
kp4_error = wg4_output_node_name + "Keypair error"
mac4_error = wg4_input_node_name + "Invalid MAC handshake"
peer4_in_err = wg4_input_node_name + "Peer error"
peer4_out_err = wg4_output_node_name + "Peer error"
kp6_error = wg6_output_node_name + "Keypair error"
mac6_error = wg6_input_node_name + "Invalid MAC handshake"
peer6_in_err = wg6_input_node_name + "Peer error"
peer6_out_err = wg6_output_node_name + "Peer error"
cookie_dec4_err = wg4_input_node_name + "Failed during Cookie decryption"
cookie_dec6_err = wg6_input_node_name + "Failed during Cookie decryption"
ratelimited4_err = wg4_input_node_name + "Handshake ratelimited"
ratelimited6_err = wg6_input_node_name + "Handshake ratelimited"
@classmethod
def setUpClass(cls):
super(TestPvti, cls).setUpClass()
try:
cls.create_pg_interfaces(range(2))
for i in cls.pg_interfaces:
i.admin_up()
i.config_ip4()
i.config_ip6()
i.resolve_arp()
i.resolve_ndp()
except Exception:
super(TestPvti, cls).tearDownClass()
raise
@classmethod
def tearDownClass(cls):
super(TestPvti, cls).tearDownClass()
def setUp(self):
super(VppTestCase, self).setUp()
self.base_kp4_err = self.statistics.get_err_counter(self.kp4_error)
self.base_mac4_err = self.statistics.get_err_counter(self.mac4_error)
self.base_peer4_in_err = self.statistics.get_err_counter(self.peer4_in_err)
self.base_peer4_out_err = self.statistics.get_err_counter(self.peer4_out_err)
self.base_kp6_err = self.statistics.get_err_counter(self.kp6_error)
self.base_mac6_err = self.statistics.get_err_counter(self.mac6_error)
self.base_peer6_in_err = self.statistics.get_err_counter(self.peer6_in_err)
self.base_peer6_out_err = self.statistics.get_err_counter(self.peer6_out_err)
self.base_cookie_dec4_err = self.statistics.get_err_counter(
self.cookie_dec4_err
)
self.base_cookie_dec6_err = self.statistics.get_err_counter(
self.cookie_dec6_err
)
self.base_ratelimited4_err = self.statistics.get_err_counter(
self.ratelimited4_err
)
self.base_ratelimited6_err = self.statistics.get_err_counter(
self.ratelimited6_err
)
def create_packets(
self, src_ip_if, count=1, size=150, for_rx=False, is_ip6=False, af_mix=False
):
pkts = []
total_packet_count = count
padstr0 = ""
padstr1 = ""
for i in range(0, 2000):
padstr0 = padstr0 + (".%03x" % i)
padstr1 = padstr1 + ("+%03x" % i)
for i in range(0, total_packet_count):
if af_mix:
is_ip6 = i % 2 == 1
dst_mac = src_ip_if.local_mac
src_mac = src_ip_if.remote_mac
if for_rx:
dst_ip4 = src_ip_if.remote_ip4
dst_ip6 = src_ip_if.remote_ip6
src_ip4 = "10.0.%d.4" % i
src_ip6 = "2001:db8::%x" % i
else:
src_ip4 = src_ip_if.remote_ip4
src_ip6 = src_ip_if.remote_ip6
dst_ip4 = "10.0.%d.4" % i
dst_ip6 = "2001:db8::%x" % i
src_l4 = 1234 + i
dst_l4 = 4321 + i
ulp = UDP(sport=src_l4, dport=dst_l4)
payload = "test pkt #%d" % i
if i % 2 == 1:
padstr = padstr1
else:
padstr = padstr0
p = Ether(dst=dst_mac, src=src_mac)
if is_ip6:
p /= IPv6(src=src_ip6, dst=dst_ip6)
else:
p /= IP(src=src_ip4, dst=dst_ip4, frag=0, flags=0)
p /= ulp / Raw(payload)
if i % 2 == 1 or total_packet_count == 1:
self.extend_packet(p, size, padstr)
else:
self.extend_packet(p, 150, padstr)
pkts.append(p)
return pkts
def add_rx_ether_header(self, in_pkts, rx_intf=None):
out = []
if rx_intf is None:
rx_intf = self.pg0
dst_mac = rx_intf.local_mac
src_mac = rx_intf.remote_mac
pkts = []
for p in in_pkts:
p0 = Ether(dst=dst_mac, src=src_mac) / p[IP]
out.append(p0)
return out
def encap_for_rx_test(self, pkts, rx_intf=None):
ende = self.pvti0.get_ende(for_rx_test=True)
encap_pkts = ende.encap_packets(pkts)
return self.add_rx_ether_header(encap_pkts, rx_intf)
def decrement_ttl_and_build(self, send_pkts):
out = []
pkts = copy.deepcopy(send_pkts)
for p in pkts:
p[IP].ttl = p[IP].ttl - 1
out.append(Ether(p.build()))
return out
def create_rx_packets(self, dst_ip_if, rx_intf=None, count=1, size=150):
pkts = []
total_packet_count = count
padstr = ""
if rx_intf is None:
rx_intf = self.pg0
for i in range(0, 2000):
padstr = padstr + (".%03x" % i)
dst_mac = rx_intf.local_mac
src_mac = rx_intf.remote_mac
for i in range(0, total_packet_count):
dst_ip4 = dst_ip_if.remote_ip4
src_ip4 = "10.0.%d.4" % i
src_l4 = 1234 + i
dst_l4 = 4321 + i
ulp = UDP(sport=src_l4, dport=dst_l4)
payload = "test"
# if i % 2 == 1 or total_packet_count == 1:
# self.extend_packet(p, size, padstr)
# else:
# self.extend_packet(p, 150, padstr)
pvti = PVTI(seq=42 + i, chunks=[])
for j in range(0, 32):
p = (
IP(src=src_ip4, dst=dst_ip4, frag=0, flags=0, id=j + 0x4000)
/ ulp
/ Raw(payload)
)
chunk0 = PVTIChunk(data=raw(p))
pvti.chunks.append(chunk0)
p = (
Ether(dst=dst_mac, src=src_mac)
/ IP(src="192.0.2.1", dst=rx_intf.local_ip4, id=0x3000 + i)
/ UDP(sport=12312, dport=12312)
/ pvti
)
# p.show()
# Ether(raw(p)).show()
pkts.append(p)
return pkts
def send_and_assert_no_replies_ignoring_init(
self, intf, pkts, remark="", timeout=None
):
self.pg_send(intf, pkts)
def _filter_out_fn(p):
return is_ipv6_misc(p) or is_handshake_init(p)
try:
if not timeout:
timeout = 1
for i in self.pg_interfaces:
i.assert_nothing_captured(
timeout=timeout, remark=remark, filter_out_fn=_filter_out_fn
)
timeout = 0.1
finally:
pass
def test_0000_pvti_interface(self):
"""Simple interface creation"""
local_port = 12312
peer_addr = self.pg0.remote_ip4 # "192.0.2.1"
peer_port = 31234
peer_port = 12312
# Create interface
pvti0 = VppPvtiInterface(
self, self.pg1.local_ip4, local_port, peer_addr, peer_port
).add_vpp_config()
self.logger.info(self.vapi.cli("sh int"))
self.logger.info(self.vapi.cli("show pvti interface"))
self.logger.info(self.vapi.cli("show pvti tx peers"))
self.logger.info(self.vapi.cli("show pvti rx peers"))
# delete interface
pvti0.remove_vpp_config()
# self.logger.info(self.vapi.cli("show pvti interface"))
# pvti0.add_vpp_config()
def test_0001_pvti_send_simple_1pkt(self):
"""v4o4 TX: Simple packet: 1 -> 1"""
self.prepare_for_test("v4o4_1pkt_simple")
pkts = self.create_packets(self.pg1)
recv_pkts = self.send_and_expect(self.pg1, pkts, self.pg0)
for p in recv_pkts:
self.logger.info(p)
c_pkts, py_pkts = self.pvti0.verify_encap_packets(pkts, recv_pkts)
self.assertEqual(c_pkts, py_pkts)
self.cleanup_after_test()
def test_0101_pvti_send_simple_1pkt(self):
"""v6o4 TX: Simple packet: 1 -> 1"""
self.prepare_for_test("v6o4_1pkt_simple")
pkts = self.create_packets(self.pg1, is_ip6=True)
recv_pkts = self.send_and_expect(self.pg1, pkts, self.pg0, n_rx=1)
for p in recv_pkts:
self.logger.info(p)
c_pkts, py_pkts = self.pvti0.verify_encap_packets(pkts, recv_pkts)
self.assertEqual(c_pkts, py_pkts)
self.cleanup_after_test()
def test_0002_pvti_send_simple_2pkt(self):
"""TX: Simple packet: 2 -> 1"""
self.prepare_for_test("2pkt_simple")
send_pkts = self.create_packets(self.pg1, count=2)
pkts = copy.deepcopy(send_pkts)
rx = self.send_and_expect(self.pg1, pkts, self.pg0, n_rx=1)
for p in rx:
self.logger.info(p)
# p.show()
payload0 = rx[0][PVTI].chunks[0].data
payload1 = rx[0][PVTI].chunks[1].data
pktA0 = IP(payload0)
pktA1 = IP(payload1)
p0 = pkts[0][IP]
p0.ttl = p0.ttl - 1
pktB0 = IP(p0.build())
p1 = pkts[1][IP]
p1.ttl = p1.ttl - 1
pktB1 = IP(p1.build())
self.assertEqual(pktA0, pktB0)
self.assertEqual(pktA1, pktB1)
c_pkts, py_pkts = self.pvti0.verify_encap_packets(send_pkts, rx)
self.assertEqual(c_pkts, py_pkts)
self.cleanup_after_test()
def prepare_for_test(self, test_name, underlay_mtu=1500, is_ip6=False):
local_port = 12312
peer_ip4_addr = "192.0.2.1"
peer_ip6_addr = "2001:db8:dead::1"
peer_port = 31234
peer_port = 12312
for i in self.pg_interfaces:
i.test_name = test_name
if is_ip6:
self.pvti0 = VppPvtiInterface(
self,
self.pg1.local_ip6,
local_port,
peer_ip6_addr,
peer_port,
underlay_mtu,
).add_vpp_config()
else:
self.pvti0 = VppPvtiInterface(
self,
self.pg1.local_ip4,
local_port,
peer_ip4_addr,
peer_port,
underlay_mtu,
).add_vpp_config()
self.pvti0.config_ip4()
self.pvti0.config_ip6()
self.pvti0.admin_up()
self.logger.info(self.vapi.cli("ip route add 0.0.0.0/0 via 172.16.3.3"))
## FIXME: using direct "interface" below results in blackouts. intermittently.
# self.logger.info(self.vapi.cli("ip route 0.0.0.0/0 via pvti0"))
self.logger.info(self.vapi.cli("ip route add ::/0 via pvti0"))
self.logger.info(self.vapi.cli("ip route add 192.0.2.1/32 via pg0"))
self.logger.info(self.vapi.cli("ip neighbor pg0 192.0.2.1 000c.0102.0304"))
self.logger.info(self.vapi.cli("ip route 2001:db8:dead::1/128 via pg0"))
self.logger.info(
self.vapi.cli("ip neighbor pg0 2001:db8:dead::1 000c.0102.0304")
)
self.logger.info(self.vapi.cli("ip neighbor pg1 172.16.2.2 000c.0102.0304"))
self.logger.info(self.vapi.cli("sh int"))
self.logger.info(self.vapi.cli("sh ip fib"))
self.logger.info(self.vapi.cli("show pvti interface"))
self.logger.info(self.vapi.cli("set interface ip pvti-bypass pg0"))
def cleanup_after_test(self):
self.logger.info(self.vapi.cli("ip neighbor del pg0 192.0.2.1 000c.0102.0304"))
self.logger.info(self.vapi.cli("ip neighbor del pg1 172.16.2.2 000c.0102.0304"))
self.logger.info(self.vapi.cli("ip route del 192.0.2.1/32 via pg0"))
# self.logger.info(self.vapi.cli("ip route del 0.0.0.0/0 via pvti0"))
self.logger.info(self.vapi.cli("ip route del ::/0 via pvti0"))
self.logger.info(self.vapi.cli("sh int"))
self.logger.info(self.vapi.cli("show pvti interface"))
self.pvti0.remove_vpp_config()
def test_0003_pvti_send_simple_1pkt_big(self):
"""TX: Simple big packet: 1 -> 2"""
self.prepare_for_test("1big_pkt")
send_pkts = self.create_packets(self.pg1, count=1, size=1900)
pkts = copy.deepcopy(send_pkts)
self.logger.info("count: ")
self.logger.info(len(pkts))
rx = self.send_and_expect(self.pg1, pkts, self.pg0, n_rx=2)
for p in rx:
self.logger.info(p)
self.logger.info(len(p[PVTI].chunks[0].data))
# p.show()
payload = rx[0][PVTI].chunks[0].data + rx[1][PVTI].chunks[0].data
pkt1 = IP(payload)
p0 = pkts[0][IP]
p0.ttl = p0.ttl - 1
pkt0 = IP(p0.build())
self.assertEqual(pkt0, pkt1)
c_pkts, py_pkts = self.pvti0.verify_encap_packets(send_pkts, rx)
self.assertEqual(c_pkts, py_pkts)
self.cleanup_after_test()
def test_0004_pvti_send_simple_5pkt_big(self):
"""v4o4 TX: Simple big packets: 5 -> 2"""
self.prepare_for_test("v4o4_5big_pkt")
send_pkts = self.create_packets(self.pg1, count=5, size=1050)
self.logger.info("count: %d " % len(send_pkts))
# self.logger.info(len(pkts))
rx = self.send_and_expect(self.pg1, send_pkts, self.pg0, n_rx=2)
for p in rx:
self.logger.info(p)
self.logger.info(len(p[PVTI].chunks[0].data))
# p.show()
c_pkts, py_pkts = self.pvti0.verify_encap_packets(send_pkts, rx)
self.assertEqual(c_pkts, py_pkts)
self.cleanup_after_test()
def test_0104_pvti_send_simple_5pkt_big(self):
"""v6o4 TX: Simple big packets: 5 -> 2"""
self.prepare_for_test("v4o4_5big_pkt")
send_pkts = self.create_packets(self.pg1, count=5, size=1050, is_ip6=True)
self.logger.info("count: %d " % len(send_pkts))
# self.logger.info(len(pkts))
rx = self.send_and_expect(self.pg1, send_pkts, self.pg0, n_rx=2)
for p in rx:
self.logger.info(p)
self.logger.info(len(p[PVTI].chunks[0].data))
# p.show()
c_pkts, py_pkts = self.pvti0.verify_encap_packets(send_pkts, rx)
self.assertEqual(c_pkts, py_pkts)
self.cleanup_after_test()
def Xtest_0204_pvti_send_simple_5pkt_mix(self):
"""vXo4 TX: Simple packets mix: 5 -> 2"""
# FIXME: This test is disabled for now, but left here, to have this comment
# The mix of IPv4 and IPv6 packets in VPP will forward two
# different graphs, so after encap it will result in two
# PV packets: one with IPv4 chunks, and one with IPv6 chunks.
# The python test encapsulator does not do this, and it is probably
# a useless idea to introduce attempts to mimic this behavior,
# because in any case one can not expect the orderly scheduling
# of IPv4 vs IPv6 graph processing.
self.prepare_for_test("vXo4_5big_pkt")
send_pkts = self.create_packets(self.pg1, count=5, size=1050, af_mix=True)
# self.logger.info(len(pkts))
rx = self.send_and_expect(self.pg1, send_pkts, self.pg0, n_rx=2)
for p in rx:
self.logger.info(p)
self.logger.info(len(p[PVTI].chunks[0].data))
c_pkts, py_pkts = self.pvti0.verify_encap_packets(send_pkts, rx)
self.assertEqual(c_pkts, py_pkts)
self.cleanup_after_test()
def test_0005_pvti_send_mix_3pkt_medium_mtu(self):
"""TX: small+big+small packets over medium mtu: 3 -> 3"""
self.prepare_for_test("3pkt_small_mtu", underlay_mtu=400)
send_pkts = self.create_packets(self.pg1, count=3, size=500)
pkts = copy.deepcopy(send_pkts)
self.logger.info("count: %d " % len(send_pkts))
# self.logger.info(len(pkts))
rx = self.send_and_expect(self.pg1, send_pkts, self.pg0, n_rx=3)
for p in rx:
self.logger.info(p)
self.logger.info(len(p[PVTI].chunks[0].data))
# p.show()
# check the middle chunk which is spread across two packets
payload = rx[0][PVTI].chunks[1].data + rx[1][PVTI].chunks[0].data
pkt1 = IP(payload)
p0 = pkts[1][IP]
p0.ttl = p0.ttl - 1
pkt0 = IP(p0.build())
self.assertEqual(pkt0, pkt1)
c_pkts, py_pkts = self.pvti0.verify_encap_packets(send_pkts, rx)
self.assertEqual(c_pkts, py_pkts)
self.cleanup_after_test()
def test_0006_pvti_send_mix_4pkt_medium_mtu(self):
"""TX: small+big+small packets over 600 mtu: 4 -> 3"""
self.prepare_for_test("6pkt_small_mtu", underlay_mtu=600)
send_pkts = self.create_packets(self.pg1, count=4, size=500)
pkts = copy.deepcopy(send_pkts)
# self.logger.info(len(pkts))
rx = self.send_and_expect(self.pg1, send_pkts, self.pg0, n_rx=3)
for p in rx:
self.logger.info(p)
self.logger.info(len(p[PVTI].chunks[0].data))
# p.show()
# check the middle chunk which is spread across two packets
payload = rx[0][PVTI].chunks[1].data + rx[1][PVTI].chunks[0].data
pkt1 = IP(payload)
p0 = pkts[1][IP]
p0.ttl = p0.ttl - 1
pkt0 = IP(p0.build())
self.assertEqual(pkt0, pkt1)
c_pkts, py_pkts = self.pvti0.verify_encap_packets(send_pkts, rx)
self.assertEqual(c_pkts, py_pkts)
self.cleanup_after_test()
def test_0007_pvti_send_simple_1_3_pkt(self):
"""TX: Simple packet: 1 -> 3, small mtu"""
self.prepare_for_test("1_3_pkt_simple", underlay_mtu=520)
send_pkts = self.create_packets(self.pg1, count=1, size=1400)
pkts = copy.deepcopy(send_pkts)
rx = self.send_and_expect(self.pg1, pkts, self.pg0, n_rx=3)
for p in rx:
self.logger.info(p)
c_pkts, py_pkts = self.pvti0.verify_encap_packets(send_pkts, rx)
self.assertEqual(c_pkts, py_pkts)
self.cleanup_after_test()
def test_0008_pvti_chained_1_3_pkt(self):
"""TX: Chained packet: 2700 byte 1 -> 3, mtu 1000"""
self.prepare_for_test("1_3_pkt_simple", underlay_mtu=1000)
send_pkts = self.create_packets(self.pg1, count=1, size=2700)
pkts = copy.deepcopy(send_pkts)
pkt0 = Ether(raw(pkts[0]))[IP]
rx = self.send_and_expect(self.pg1, send_pkts, self.pg0, n_rx=3)
for p in rx:
self.logger.info(p)
p0 = pkts[0][IP]
p0.ttl = p0.ttl - 1
pkt0 = IP(p0.build())
payload = (
rx[0][PVTI].chunks[0].data
+ rx[1][PVTI].chunks[0].data
+ rx[2][PVTI].chunks[0].data
# + rx[2][PVTI].chunks[1].data
)
pkt1 = IP(payload)
self.assertEqual(pkt0, pkt1)
# FIXME: this will fail because the send path
# does not combine the data from two chained blocks.
# when this succeeds, the above checks in this testcase will need to be redone
# c_pkts, py_pkts = self.pvti0.verify_encap_packets(send_pkts, rx)
# self.assertEqual(c_pkts, py_pkts)
self.cleanup_after_test()
def test_1001_pvti_rx_simple_1pkt(self):
"""RX: Simple packet: 1 -> 32"""
self.prepare_for_test("1pkt_rx_simple")
pkts = self.create_rx_packets(self.pg1, rx_intf=self.pg0)
self.logger.info(self.vapi.cli("show pvti interface"))
self.logger.info(self.vapi.cli("show udp ports"))
recv_pkts = self.send_and_expect(self.pg0, pkts, self.pg1, n_rx=32)
for p in recv_pkts:
self.logger.info(p)
self.cleanup_after_test()
def test_1002_pvti_rx_big_1buf(self):
"""RX: Orig Big packet, single buf: 2 -> 1"""
self.prepare_for_test("1buf_rx_big")
pkts_orig = self.create_packets(self.pg1, count=1, size=1900, for_rx=True)
pkts = self.encap_for_rx_test(pkts_orig, rx_intf=self.pg0)
self.logger.info(self.vapi.cli("show pvti interface"))
self.logger.info(self.vapi.cli("show udp ports"))
known_good_pkts = self.decrement_ttl_and_build(pkts_orig)
recv_pkts = self.send_and_expect(self.pg0, pkts, self.pg1, n_rx=1)
for i, p in enumerate(recv_pkts):
self.logger.info(p)
self.assertEqual(p[IP], known_good_pkts[i][IP])
self.cleanup_after_test()
def test_1003_pvti_rx_big_2buf(self):
"""RX: Very Big packet, chained buf: 3 -> 1"""
self.prepare_for_test("2buf_rx_big")
pkts_orig = self.create_packets(self.pg1, count=1, size=3000, for_rx=True)
pkts = self.encap_for_rx_test(pkts_orig, rx_intf=self.pg0)
self.logger.info(self.vapi.cli("show pvti interface"))
self.logger.info(self.vapi.cli("show udp ports"))
known_good_pkts = self.decrement_ttl_and_build(pkts_orig)
recv_pkts = self.send_and_expect(self.pg0, pkts, self.pg1, n_rx=1)
for i, p in enumerate(recv_pkts):
self.logger.info(p)
if p[IP] != known_good_pkts[i][IP]:
p[IP].show()
known_good_pkts[i][IP].show()
self.assertEqual(p[IP], known_good_pkts[i][IP])
self.cleanup_after_test()
def test_1004_pvti_rx_big_2buf_and_small(self):
"""RX: Very Big packet, chained buf: 3 -> 1 + small pkt"""
self.prepare_for_test("2buf_rx_big_and_small")
pkts_orig = self.create_packets(self.pg1, count=2, size=3000, for_rx=True)
pkts = self.encap_for_rx_test(pkts_orig, rx_intf=self.pg0)
self.logger.info(self.vapi.cli("show pvti interface"))
self.logger.info(self.vapi.cli("show udp ports"))
known_good_pkts = self.decrement_ttl_and_build(pkts_orig)
recv_pkts = self.send_and_expect(self.pg0, pkts, self.pg1, n_rx=2)
for i, p in enumerate(recv_pkts):
self.logger.info(p)
if p[IP] != known_good_pkts[i][IP]:
p[IP].show()
known_good_pkts[i][IP].show()
self.assertEqual(p[IP], known_good_pkts[i][IP])
self.cleanup_after_test()
def test_1005_pvti_rx_big_2buf_and_small_drop(self):
"""RX: Very Big packet, chained buf: 3 -> 1 + small pkt, encap pkt lost"""
self.prepare_for_test("2buf_rx_big_and_small_drop")
pkts_orig = self.create_packets(self.pg1, count=3, size=3000, for_rx=True)
pkts = self.encap_for_rx_test(pkts_orig, rx_intf=self.pg0)
# drop the second packet after encapsulation (the one with the second frag of the large packet)
pkts.pop(1)
self.logger.info(self.vapi.cli("show pvti interface"))
self.logger.info(self.vapi.cli("show udp ports"))
known_good_pkts = self.decrement_ttl_and_build(pkts_orig)
# drop the large original packet, leaving just two small ones
known_good_pkts.pop(1)
recv_pkts = self.send_and_expect(self.pg0, pkts, self.pg1, n_rx=2)
for i, p in enumerate(recv_pkts):
self.logger.info(p)
if p[IP] != known_good_pkts[i][IP]:
p[IP].show()
known_good_pkts[i][IP].show()
self.assertEqual(p[IP], known_good_pkts[i][IP])
self.cleanup_after_test()
def test_1006_pvti_rx_big_2buf_and_small_drop2(self):
"""RX: Very Big packet, chained buf: 3 -> 1 + small pkt, non-initial frag pkt lost"""
self.prepare_for_test("2buf_rx_big_and_small_drop2")
pkts_orig = self.create_packets(self.pg1, count=3, size=6000, for_rx=True)
pkts = self.encap_for_rx_test(pkts_orig, rx_intf=self.pg0)
# drop the second packet after encapsulation (the one with the second frag of the large packet)
pkts.pop(2)
self.logger.info(self.vapi.cli("show pvti interface"))
self.logger.info(self.vapi.cli("show udp ports"))
known_good_pkts = self.decrement_ttl_and_build(pkts_orig)
# drop the large original packet, leaving just two small ones
known_good_pkts.pop(1)
recv_pkts = self.send_and_expect(self.pg0, pkts, self.pg1, n_rx=2)
for i, p in enumerate(recv_pkts):
self.logger.info(p)
if p[IP] != known_good_pkts[i][IP]:
p[IP].show()
known_good_pkts[i][IP].show()
self.assertEqual(p[IP], known_good_pkts[i][IP])
self.cleanup_after_test()
class PvtiHandoffTests(TestPvti):
"""Pvti Tests in multi worker setup"""
vpp_worker_count = 2
def xtest_wg_peer_init(self):
"""Handoff"""
port = 12383
# Create interfaces
wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
wg0.admin_up()
wg0.config_ip4()
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
peer_1 = VppWgPeer(
self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.2.0/24", "10.11.3.0/24"]
).add_vpp_config()
self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
r1 = VppIpRoute(
self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
).add_vpp_config()
# skip the first automatic handshake
self.pg1.get_capture(1, timeout=HANDSHAKE_JITTER)
# send a valid handsake init for which we expect a response
p = peer_1.mk_handshake(self.pg1)
rx = self.send_and_expect(self.pg1, [p], self.pg1)
peer_1.consume_response(rx[0])
# send a data packet from the peer through the tunnel
# this completes the handshake and pins the peer to worker 0
p = (
IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
/ UDP(sport=222, dport=223)
/ Raw()
)
d = peer_1.encrypt_transport(p)
p = peer_1.mk_tunnel_header(self.pg1) / (
Pvti(message_type=4, reserved_zero=0)
/ PvtiTransport(
receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
)
)
rxs = self.send_and_expect(self.pg1, [p], self.pg0, worker=0)
for rx in rxs:
self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
self.assertEqual(rx[IP].ttl, 19)
# send a packets that are routed into the tunnel
# and pins the peer tp worker 1
pe = (
Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
/ IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
/ UDP(sport=555, dport=556)
/ Raw(b"\x00" * 80)
)
rxs = self.send_and_expect(self.pg0, pe * 255, self.pg1, worker=1)
peer_1.validate_encapped(rxs, pe)
# send packets into the tunnel, from the other worker
p = [
(
peer_1.mk_tunnel_header(self.pg1)
/ Pvti(message_type=4, reserved_zero=0)
/ PvtiTransport(
receiver_index=peer_1.sender,
counter=ii + 1,
encrypted_encapsulated_packet=peer_1.encrypt_transport(
(
IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
/ UDP(sport=222, dport=223)
/ Raw()
)
),
)
)
for ii in range(255)
]
rxs = self.send_and_expect(self.pg1, p, self.pg0, worker=1)
for rx in rxs:
self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
self.assertEqual(rx[IP].ttl, 19)
# send a packets that are routed into the tunnel
# from worker 0
rxs = self.send_and_expect(self.pg0, pe * 255, self.pg1, worker=0)
peer_1.validate_encapped(rxs, pe)
r1.remove_vpp_config()
peer_1.remove_vpp_config()
wg0.remove_vpp_config()