Files
vpp/test/test_map.py
Ole Troan f159f58dbf test framework: add factory function and default parameters
This is the first step to be able to remove all the API message
wrappers in vpp_papi_provider.py. This allows to remove all
functions that do not override parameters (different from zero),
and a separate dictionary for messages requiring different defaults.

The general requirement is that all new tests should use named
arguments directly. Not positional arguments through the wrapper.

Note when removing functions, the calls in vpp_papi_provider
wrappers do not necessarily follow message order.

Change-Id: If64916c07f8622c138db3a9d7c4a98b93a058e68
Signed-off-by: Ole Troan <ot@cisco.com>
2019-03-04 14:31:08 +00:00

394 lines
14 KiB
Python

#!/usr/bin/env python
import unittest
from framework import VppTestCase, VppTestRunner
from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath
from scapy.layers.l2 import Ether, Raw
from scapy.layers.inet import IP, UDP, ICMP, TCP, fragment
from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
class TestMAP(VppTestCase):
""" MAP Test Case """
def setUp(self):
super(TestMAP, self).setUp()
# create 2 pg interfaces
self.create_pg_interfaces(range(4))
# pg0 is 'inside' IPv4
self.pg0.admin_up()
self.pg0.config_ip4()
self.pg0.resolve_arp()
# pg1 is 'outside' IPv6
self.pg1.admin_up()
self.pg1.config_ip6()
self.pg1.generate_remote_hosts(4)
self.pg1.configure_ipv6_neighbors()
def tearDown(self):
super(TestMAP, self).tearDown()
for i in self.pg_interfaces:
i.unconfig_ip4()
i.unconfig_ip6()
i.admin_down()
def send_and_assert_encapped(self, tx, ip6_src, ip6_dst, dmac=None):
if not dmac:
dmac = self.pg1.remote_mac
self.pg0.add_stream(tx)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg1.get_capture(1)
rx = rx[0]
self.assertEqual(rx[Ether].dst, dmac)
self.assertEqual(rx[IP].src, tx[IP].src)
self.assertEqual(rx[IPv6].src, ip6_src)
self.assertEqual(rx[IPv6].dst, ip6_dst)
def test_map_e(self):
""" MAP-E """
#
# Add a route to the MAP-BR
#
map_br_pfx = "2001::"
map_br_pfx_len = 64
map_route = VppIpRoute(self,
map_br_pfx,
map_br_pfx_len,
[VppRoutePath(self.pg1.remote_ip6,
self.pg1.sw_if_index,
proto=DpoProto.DPO_PROTO_IP6)],
is_ip6=1)
map_route.add_vpp_config()
#
# Add a domain that maps from pg0 to pg1
#
map_dst = '2001::/64'
map_src = '3000::1/128'
client_pfx = '192.168.0.0/16'
self.vapi.map_add_domain(map_dst, client_pfx, map_src)
# Enable MAP on interface.
self.vapi.map_if_enable_disable(is_enable=1,
sw_if_index=self.pg0.sw_if_index,
is_translation=0)
# Ensure MAP doesn't steal all packets!
v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
UDP(sport=20000, dport=10000) /
Raw('\xa5' * 100))
rx = self.send_and_expect(self.pg0, v4*1, self.pg0)
v4_reply = v4[1]
v4_reply.ttl -= 1
for p in rx:
self.validate(p[1], v4_reply)
#
# Fire in a v4 packet that will be encapped to the BR
#
v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
IP(src=self.pg0.remote_ip4, dst='192.168.1.1') /
UDP(sport=20000, dport=10000) /
Raw('\xa5' * 100))
self.send_and_assert_encapped(v4, "3000::1", "2001::c0a8:0:0")
# Enable MAP on interface.
self.vapi.map_if_enable_disable(is_enable=1,
sw_if_index=self.pg1.sw_if_index,
is_translation=0)
# Ensure MAP doesn't steal all packets
v6 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) /
UDP(sport=20000, dport=10000) /
Raw('\xa5' * 100))
rx = self.send_and_expect(self.pg1, v6*1, self.pg1)
v6_reply = v6[1]
v6_reply.hlim -= 1
for p in rx:
self.validate(p[1], v6_reply)
#
# Fire in a V6 encapped packet.
# expect a decapped packet on the inside ip4 link
#
p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
IPv6(dst='3000::1', src="2001::1") /
IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
UDP(sport=20000, dport=10000) /
Raw('\xa5' * 100))
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg0.get_capture(1)
rx = rx[0]
self.assertFalse(rx.haslayer(IPv6))
self.assertEqual(rx[IP].src, p[IP].src)
self.assertEqual(rx[IP].dst, p[IP].dst)
#
# Pre-resolve. No API for this!!
#
self.vapi.ppcli("map params pre-resolve ip6-nh 4001::1")
self.send_and_assert_no_replies(self.pg0, v4,
"resovled via default route")
#
# Add a route to 4001::1. Expect the encapped traffic to be
# sent via that routes next-hop
#
pre_res_route = VppIpRoute(
self, "4001::1", 128,
[VppRoutePath(self.pg1.remote_hosts[2].ip6,
self.pg1.sw_if_index,
proto=DpoProto.DPO_PROTO_IP6)],
is_ip6=1)
pre_res_route.add_vpp_config()
self.send_and_assert_encapped(v4, "3000::1",
"2001::c0a8:0:0",
dmac=self.pg1.remote_hosts[2].mac)
#
# change the route to the pre-solved next-hop
#
pre_res_route.modify([VppRoutePath(self.pg1.remote_hosts[3].ip6,
self.pg1.sw_if_index,
proto=DpoProto.DPO_PROTO_IP6)])
pre_res_route.add_vpp_config()
self.send_and_assert_encapped(v4, "3000::1",
"2001::c0a8:0:0",
dmac=self.pg1.remote_hosts[3].mac)
#
# cleanup. The test infra's object registry will ensure
# the route is really gone and thus that the unresolve worked.
#
pre_res_route.remove_vpp_config()
self.vapi.ppcli("map params pre-resolve del ip6-nh 4001::1")
def validate(self, rx, expected):
self.assertEqual(rx, expected.__class__(str(expected)))
def payload(self, len):
return 'x' * len
def test_map_t(self):
""" MAP-T """
#
# Add a domain that maps from pg0 to pg1
#
map_dst = '2001:db8::/32'
map_src = '1234:5678:90ab:cdef::/64'
ip4_pfx = '192.168.0.0/24'
self.vapi.map_add_domain(map_dst, ip4_pfx, map_src,
16, 6, 4, mtu=1500)
# Enable MAP-T on interfaces.
self.vapi.map_if_enable_disable(is_enable=1,
sw_if_index=self.pg0.sw_if_index,
is_translation=1)
self.vapi.map_if_enable_disable(is_enable=1,
sw_if_index=self.pg1.sw_if_index,
is_translation=1)
# Ensure MAP doesn't steal all packets!
v4 = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
UDP(sport=20000, dport=10000) /
Raw('\xa5' * 100))
rx = self.send_and_expect(self.pg0, v4*1, self.pg0)
v4_reply = v4[1]
v4_reply.ttl -= 1
for p in rx:
self.validate(p[1], v4_reply)
# Ensure MAP doesn't steal all packets
v6 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
IPv6(src=self.pg1.remote_ip6, dst=self.pg1.remote_ip6) /
UDP(sport=20000, dport=10000) /
Raw('\xa5' * 100))
rx = self.send_and_expect(self.pg1, v6*1, self.pg1)
v6_reply = v6[1]
v6_reply.hlim -= 1
for p in rx:
self.validate(p[1], v6_reply)
map_route = VppIpRoute(self,
"2001:db8::",
32,
[VppRoutePath(self.pg1.remote_ip6,
self.pg1.sw_if_index,
proto=DpoProto.DPO_PROTO_IP6)],
is_ip6=1)
map_route.add_vpp_config()
#
# Send a v4 packet that will be translated
#
p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
payload = TCP(sport=0xabcd, dport=0xabcd)
p4 = (p_ether / p_ip4 / payload)
p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
dst="2001:db8:1f0::c0a8:1:f") / payload)
p6_translated.hlim -= 1
rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
for p in rx:
self.validate(p[1], p6_translated)
# Send back an IPv6 packet that will be "untranslated"
p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
dst='1234:5678:90ab:cdef:ac:1001:200:0')
p6 = (p_ether6 / p_ip6 / payload)
p4_translated = (IP(src='192.168.0.1',
dst=self.pg0.remote_ip4) / payload)
p4_translated.id = 0
p4_translated.ttl -= 1
rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
for p in rx:
self.validate(p[1], p4_translated)
# IPv4 TTL
ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0)
p4 = (p_ether / ip4_ttl_expired / payload)
icmp4_reply = (IP(id=0, ttl=254, src=self.pg0.local_ip4,
dst=self.pg0.remote_ip4) /
ICMP(type='time-exceeded',
code='ttl-zero-during-transit') /
IP(src=self.pg0.remote_ip4,
dst='192.168.0.1', ttl=0) / payload)
rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
for p in rx:
self.validate(p[1], icmp4_reply)
'''
This one is broken, cause it would require hairpinning...
# IPv4 TTL TTL1
ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=1)
p4 = (p_ether / ip4_ttl_expired / payload)
icmp4_reply = IP(id=0, ttl=254, src=self.pg0.local_ip4,
dst=self.pg0.remote_ip4) / \
ICMP(type='time-exceeded', code='ttl-zero-during-transit' ) / \
IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0) / payload
rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
for p in rx:
self.validate(p[1], icmp4_reply)
'''
# IPv6 Hop limit
ip6_hlim_expired = IPv6(hlim=0, src='2001:db8:1ab::c0a8:1:ab',
dst='1234:5678:90ab:cdef:ac:1001:200:0')
p6 = (p_ether6 / ip6_hlim_expired / payload)
icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6,
dst="2001:db8:1ab::c0a8:1:ab") /
ICMPv6TimeExceeded(code=0) /
IPv6(src="2001:db8:1ab::c0a8:1:ab",
dst='1234:5678:90ab:cdef:ac:1001:200:0',
hlim=0) / payload)
rx = self.send_and_expect(self.pg1, p6*1, self.pg1)
for p in rx:
self.validate(p[1], icmp6_reply)
# IPv4 Well-known port
p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
payload = UDP(sport=200, dport=200)
p4 = (p_ether / p_ip4 / payload)
self.send_and_assert_no_replies(self.pg0, p4*1)
# IPv6 Well-known port
payload = UDP(sport=200, dport=200)
p6 = (p_ether6 / p_ip6 / payload)
self.send_and_assert_no_replies(self.pg1, p6*1)
# Packet fragmentation
payload = UDP(sport=40000, dport=4000) / self.payload(1453)
p4 = (p_ether / p_ip4 / payload)
self.pg_enable_capture()
self.pg0.add_stream(p4)
self.pg_start()
rx = self.pg1.get_capture(2)
for p in rx:
pass
# TODO: Manual validation
# self.validate(p[1], icmp4_reply)
# Packet fragmentation send fragments
payload = UDP(sport=40000, dport=4000) / self.payload(1453)
p4 = (p_ether / p_ip4 / payload)
frags = fragment(p4, fragsize=1000)
self.pg_enable_capture()
self.pg0.add_stream(frags)
self.pg_start()
rx = self.pg1.get_capture(2)
for p in rx:
pass
# p.show2()
# reass_pkt = reassemble(rx)
# p4_reply.ttl -= 1
# p4_reply.id = 256
# self.validate(reass_pkt, p4_reply)
# TCP MSS clamping
self.vapi.map_param_set_tcp(1300)
#
# Send a v4 TCP SYN packet that will be translated and MSS clamped
#
p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
payload = TCP(sport=0xabcd, dport=0xabcd, flags="S",
options=[('MSS', 1460)])
p4 = (p_ether / p_ip4 / payload)
p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
dst="2001:db8:1f0::c0a8:1:f") / payload)
p6_translated.hlim -= 1
p6_translated['TCP'].options = [('MSS', 1300)]
rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
for p in rx:
self.validate(p[1], p6_translated)
# Send back an IPv6 packet that will be "untranslated"
p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
dst='1234:5678:90ab:cdef:ac:1001:200:0')
p6 = (p_ether6 / p_ip6 / payload)
p4_translated = (IP(src='192.168.0.1',
dst=self.pg0.remote_ip4) / payload)
p4_translated.id = 0
p4_translated.ttl -= 1
p4_translated['TCP'].options = [('MSS', 1300)]
rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
for p in rx:
self.validate(p[1], p4_translated)
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)