MAP: Add check for well known ports.

And more unit-tests.

Change-Id: I4667d82d928b7ba8d96b5a5648d464115b3ed216
Signed-off-by: Ole Troan <ot@cisco.com>
This commit is contained in:
Ole Troan
2018-09-28 14:28:00 +02:00
parent bf4d126b81
commit 9be93c8f85
2 changed files with 155 additions and 2 deletions

View File

@ -8,8 +8,8 @@ from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath
from scapy.layers.l2 import Ether, Raw
from scapy.layers.inet import IP, UDP, ICMP
from scapy.layers.inet6 import IPv6
from scapy.layers.inet import IP, UDP, ICMP, TCP, fragment
from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
class TestMAP(VppTestCase):
@ -163,5 +163,150 @@ class TestMAP(VppTestCase):
pre_res_route.remove_vpp_config()
self.vapi.ppcli("map params pre-resolve del ip6-nh 4001::1")
def validate(self, rx, expected):
self.assertEqual(rx, expected.__class__(str(expected)))
def payload(self, len):
return 'x' * len
def test_map_t(self):
""" MAP-T """
#
# Add a domain that maps from pg0 to pg1
#
map_dst = socket.inet_pton(socket.AF_INET6, "2001:db8::")
map_src = socket.inet_pton(socket.AF_INET6, "1234:5678:90ab:cdef::")
ip4_pfx = socket.inet_pton(socket.AF_INET, "192.168.0.0")
self.vapi.map_add_domain(map_dst, 32, map_src, 64, ip4_pfx,
24, 16, 6, 4, 1)
# Enable MAP-T on interfaces.
# self.vapi.map_if_enable_disable(1, self.pg0.sw_if_index, 1)
# self.vapi.map_if_enable_disable(1, self.pg1.sw_if_index, 1)
map_route = VppIpRoute(self,
"2001:db8::",
32,
[VppRoutePath(self.pg1.remote_ip6,
self.pg1.sw_if_index,
proto=DpoProto.DPO_PROTO_IP6)],
is_ip6=1)
map_route.add_vpp_config()
#
# Send a v4 packet that will be translated
#
p_ether = Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
payload = TCP(sport=0xabcd, dport=0xabcd)
p4 = (p_ether / p_ip4 / payload)
p6_translated = (IPv6(src="1234:5678:90ab:cdef:ac:1001:200:0",
dst="2001:db8:1f0::c0a8:1:f") / payload)
p6_translated.hlim -= 1
rx = self.send_and_expect(self.pg0, p4*1, self.pg1)
for p in rx:
self.validate(p[1], p6_translated)
# Send back an IPv6 packet that will be "untranslated"
p_ether6 = Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
p_ip6 = IPv6(src='2001:db8:1f0::c0a8:1:f',
dst='1234:5678:90ab:cdef:ac:1001:200:0')
p6 = (p_ether6 / p_ip6 / payload)
p4_translated = (IP(src='192.168.0.1',
dst=self.pg0.remote_ip4) / payload)
p4_translated.id = 0
p4_translated.ttl -= 1
rx = self.send_and_expect(self.pg1, p6*1, self.pg0)
for p in rx:
self.validate(p[1], p4_translated)
# IPv4 TTL
ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0)
p4 = (p_ether / ip4_ttl_expired / payload)
icmp4_reply = (IP(id=0, ttl=254, src=self.pg0.local_ip4,
dst=self.pg0.remote_ip4) /
ICMP(type='time-exceeded',
code='ttl-zero-during-transit') /
IP(src=self.pg0.remote_ip4,
dst='192.168.0.1', ttl=0) / payload)
rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
for p in rx:
self.validate(p[1], icmp4_reply)
'''
This one is broken, cause it would require hairpinning...
# IPv4 TTL TTL1
ip4_ttl_expired = IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=1)
p4 = (p_ether / ip4_ttl_expired / payload)
icmp4_reply = IP(id=0, ttl=254, src=self.pg0.local_ip4,
dst=self.pg0.remote_ip4) / \
ICMP(type='time-exceeded', code='ttl-zero-during-transit' ) / \
IP(src=self.pg0.remote_ip4, dst='192.168.0.1', ttl=0) / payload
rx = self.send_and_expect(self.pg0, p4*1, self.pg0)
for p in rx:
self.validate(p[1], icmp4_reply)
'''
# IPv6 Hop limit
ip6_hlim_expired = IPv6(hlim=0, src='2001:db8:1ab::c0a8:1:ab',
dst='1234:5678:90ab:cdef:ac:1001:200:0')
p6 = (p_ether6 / ip6_hlim_expired / payload)
icmp6_reply = (IPv6(hlim=255, src=self.pg1.local_ip6,
dst="2001:db8:1ab::c0a8:1:ab") /
ICMPv6TimeExceeded(code=0) /
IPv6(src="2001:db8:1ab::c0a8:1:ab",
dst='1234:5678:90ab:cdef:ac:1001:200:0',
hlim=0) / payload)
rx = self.send_and_expect(self.pg1, p6*1, self.pg1)
for p in rx:
self.validate(p[1], icmp6_reply)
# IPv4 Well-known port
p_ip4 = IP(src=self.pg0.remote_ip4, dst='192.168.0.1')
payload = UDP(sport=200, dport=200)
p4 = (p_ether / p_ip4 / payload)
self.send_and_assert_no_replies(self.pg0, p4*1)
# IPv6 Well-known port
payload = UDP(sport=200, dport=200)
p6 = (p_ether6 / p_ip6 / payload)
self.send_and_assert_no_replies(self.pg1, p6*1)
# Packet fragmentation
payload = UDP(sport=40000, dport=4000) / self.payload(1453)
p4 = (p_ether / p_ip4 / payload)
self.pg_enable_capture()
self.pg0.add_stream(p4)
self.pg_start()
rx = self.pg1.get_capture(2)
for p in rx:
pass
# TODO: Manual validation
# self.validate(p[1], icmp4_reply)
# Packet fragmentation send fragments
payload = UDP(sport=40000, dport=4000) / self.payload(1453)
p4 = (p_ether / p_ip4 / payload)
frags = fragment(p4, fragsize=1000)
self.pg_enable_capture()
self.pg0.add_stream(frags)
self.pg_start()
rx = self.pg1.get_capture(2)
for p in rx:
pass
# p.show2()
# reass_pkt = reassemble(rx)
# p4_reply.ttl -= 1
# p4_reply.id = 256
# self.validate(reass_pkt, p4_reply)
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)