TEST:add l2bd nd term tests
Change-Id: I67633175d50a70a0b8ae4f85c659b93070f8e1fb Signed-off-by: Eyal Bari <ebari@cisco.com>
This commit is contained in:
@ -5,9 +5,18 @@ import unittest
|
||||
import random
|
||||
import copy
|
||||
|
||||
from socket import AF_INET6
|
||||
|
||||
from scapy.packet import Raw
|
||||
from scapy.layers.l2 import Ether, ARP
|
||||
from scapy.layers.inet import IP
|
||||
from scapy.utils import inet_pton, inet_ntop
|
||||
from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
|
||||
in6_mactoifaceid, in6_ismaddr
|
||||
from scapy.layers.inet6 import IPv6, UDP, ICMPv6ND_NS, ICMPv6ND_RS, \
|
||||
ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \
|
||||
ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
|
||||
ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types
|
||||
|
||||
from framework import VppTestCase, VppTestRunner
|
||||
from util import Host, ppp, mactobinary
|
||||
@ -58,12 +67,13 @@ class TestL2bdArpTerm(VppTestCase):
|
||||
self.logger.info(self.vapi.ppcli("show l2fib verbose"))
|
||||
self.logger.info(self.vapi.ppcli("show bridge-domain 1 detail"))
|
||||
|
||||
def add_del_arp_term_hosts(self, entries, bd_id=1, is_add=1):
|
||||
def add_del_arp_term_hosts(self, entries, bd_id=1, is_add=1, is_ipv6=0):
|
||||
for e in entries:
|
||||
ip = e.ip4n if is_ipv6 == 0 else e.ip6n
|
||||
self.vapi.bd_ip_mac_add_del(bd_id=bd_id,
|
||||
mac=e.bin_mac,
|
||||
ip=e.ip4n,
|
||||
is_ipv6=0,
|
||||
ip=ip,
|
||||
is_ipv6=is_ipv6,
|
||||
is_add=is_add)
|
||||
|
||||
@classmethod
|
||||
@ -80,6 +90,16 @@ class TestL2bdArpTerm(VppTestCase):
|
||||
return {cls.ip4_host(subnet, start + j, mac_list[j])
|
||||
for j in range(len(mac_list))}
|
||||
|
||||
@classmethod
|
||||
def ip6_host(cls, subnet, host, mac):
|
||||
return Host(mac=mac,
|
||||
ip6="fd01:%x::%x" % (subnet, host))
|
||||
|
||||
@classmethod
|
||||
def ip6_hosts(cls, subnet, start, mac_list):
|
||||
return {cls.ip6_host(subnet, start + j, mac_list[j])
|
||||
for j in range(len(mac_list))}
|
||||
|
||||
@classmethod
|
||||
def bd_swifs(cls, b):
|
||||
n = cls.ifs_per_bd
|
||||
@ -108,7 +128,7 @@ class TestL2bdArpTerm(VppTestCase):
|
||||
def arp_reqs(cls, src_host, entries):
|
||||
return [cls.arp(src_host, e) for e in entries]
|
||||
|
||||
def response_host(self, src_host, arp_resp):
|
||||
def arp_resp_host(self, src_host, arp_resp):
|
||||
ether = arp_resp[Ether]
|
||||
self.assertEqual(ether.dst, src_host.mac)
|
||||
|
||||
@ -121,10 +141,37 @@ class TestL2bdArpTerm(VppTestCase):
|
||||
self.assertEqual(arp.op, arp_opts["is-at"])
|
||||
self.assertEqual(arp.hwdst, src_host.mac)
|
||||
self.assertEqual(arp.pdst, src_host.ip4)
|
||||
return Host(arp.hwsrc, arp.psrc)
|
||||
return Host(mac=arp.hwsrc, ip4=arp.psrc)
|
||||
|
||||
def arp_resp_hosts(self, src_host, pkts):
|
||||
return {self.response_host(src_host, p) for p in pkts}
|
||||
return {self.arp_resp_host(src_host, p) for p in pkts}
|
||||
|
||||
@classmethod
|
||||
def ns_req(cls, src_host, host):
|
||||
nsma = in6_getnsma(inet_pton(AF_INET6, "fd10::ffff"))
|
||||
d = inet_ntop(AF_INET6, nsma)
|
||||
return (Ether(dst="ff:ff:ff:ff:ff:ff", src=src_host.mac) /
|
||||
IPv6(dst=d, src=src_host.ip6) /
|
||||
ICMPv6ND_NS(tgt=host.ip6) /
|
||||
ICMPv6NDOptSrcLLAddr(lladdr=src_host.mac))
|
||||
|
||||
@classmethod
|
||||
def ns_reqs(cls, src_host, entries):
|
||||
return [cls.ns_req(src_host, e) for e in entries]
|
||||
|
||||
def na_resp_host(self, src_host, rx):
|
||||
self.assertEqual(rx[Ether].dst, src_host.mac)
|
||||
self.assertEqual(in6_ptop(rx[IPv6].dst),
|
||||
in6_ptop(src_host.ip6))
|
||||
|
||||
self.assertTrue(rx.haslayer(ICMPv6ND_NA))
|
||||
self.assertTrue(rx.haslayer(ICMPv6NDOptDstLLAddr))
|
||||
|
||||
na = rx[ICMPv6ND_NA]
|
||||
return Host(mac=na.lladdr, ip6=na.tgt)
|
||||
|
||||
def na_resp_hosts(self, src_host, pkts):
|
||||
return {self.na_resp_host(src_host, p) for p in pkts}
|
||||
|
||||
def set_bd_flags(self, bd_id, **args):
|
||||
"""
|
||||
@ -167,6 +214,20 @@ class TestL2bdArpTerm(VppTestCase):
|
||||
resps = self.arp_resp_hosts(src_host, resp_pkts)
|
||||
self.assertEqual(len(resps ^ resp_hosts), 0)
|
||||
|
||||
def verify_nd(self, src_host, req_hosts, resp_hosts, bd_id=1):
|
||||
reqs = self.ns_reqs(src_host, req_hosts)
|
||||
|
||||
for swif in self.bd_swifs(bd_id):
|
||||
swif.add_stream(reqs)
|
||||
|
||||
self.pg_enable_capture(self.pg_interfaces)
|
||||
self.pg_start()
|
||||
|
||||
for swif in self.bd_swifs(bd_id):
|
||||
resp_pkts = swif.get_capture(len(resp_hosts))
|
||||
resps = self.na_resp_hosts(src_host, resp_pkts)
|
||||
self.assertEqual(len(resps ^ resp_hosts), 0)
|
||||
|
||||
def test_l2bd_arp_term_01(self):
|
||||
""" L2BD arp term - add 5 hosts, verify arp responses
|
||||
"""
|
||||
@ -234,6 +295,58 @@ class TestL2bdArpTerm(VppTestCase):
|
||||
self.verify_arp(src_host, hosts1, hosts2)
|
||||
self.bd_add_del(1, is_add=0)
|
||||
|
||||
def test_l2bd_arp_term_06(self):
|
||||
""" L2BD arp/ND term - hosts with both ip4/ip6
|
||||
"""
|
||||
src_host4 = self.ip4_host(50, 50, "00:00:11:22:33:44")
|
||||
src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
|
||||
self.bd_add_del(1, is_add=1)
|
||||
# enable flood to make sure requests are not flooded
|
||||
self.set_bd_flags(1, arp_term=True, flood=True,
|
||||
uu_flood=False, learn=False)
|
||||
macs = self.mac_list(range(10, 20))
|
||||
hosts6 = self.ip6_hosts(5, 1, macs)
|
||||
hosts4 = self.ip4_hosts(5, 1, macs)
|
||||
self.add_del_arp_term_hosts(hosts4, is_add=1)
|
||||
self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
|
||||
self.verify_arp(src_host4, hosts4, hosts4)
|
||||
self.verify_nd(src_host6, hosts6, hosts6)
|
||||
self.bd_add_del(1, is_add=0)
|
||||
|
||||
def test_l2bd_arp_term_07(self):
|
||||
""" L2BD ND term - Add and Del hosts, verify ND replies
|
||||
"""
|
||||
src_host6 = self.ip6_host(50, 50, "00:00:11:22:33:44")
|
||||
self.bd_add_del(1, is_add=1)
|
||||
self.set_bd_flags(1, arp_term=True, flood=False,
|
||||
uu_flood=False, learn=False)
|
||||
macs = self.mac_list(range(10, 20))
|
||||
hosts6 = self.ip6_hosts(5, 1, macs)
|
||||
self.add_del_arp_term_hosts(hosts6, is_add=1, is_ipv6=1)
|
||||
self.verify_nd(src_host6, hosts6, hosts6)
|
||||
del_macs = self.mac_list(range(10, 15))
|
||||
deleted = self.ip6_hosts(5, 1, del_macs)
|
||||
self.add_del_arp_term_hosts(deleted, is_add=0, is_ipv6=1)
|
||||
self.verify_nd(src_host6, hosts6, hosts6 - deleted)
|
||||
self.bd_add_del(1, is_add=0)
|
||||
|
||||
def test_l2bd_arp_term_08(self):
|
||||
""" L2BD ND term - Add and update IP+mac, verify ND replies
|
||||
"""
|
||||
src_host = self.ip6_host(50, 50, "00:00:11:22:33:44")
|
||||
self.bd_add_del(1, is_add=1)
|
||||
self.set_bd_flags(1, arp_term=True, flood=False,
|
||||
uu_flood=False, learn=False)
|
||||
macs1 = self.mac_list(range(10, 20))
|
||||
hosts = self.ip6_hosts(5, 1, macs1)
|
||||
self.add_del_arp_term_hosts(hosts, is_add=1, is_ipv6=1)
|
||||
self.verify_nd(src_host, hosts, hosts)
|
||||
macs2 = self.mac_list(range(20, 30))
|
||||
updated = self.ip6_hosts(5, 1, macs2)
|
||||
self.add_del_arp_term_hosts(updated, is_add=1, is_ipv6=1)
|
||||
self.verify_nd(src_host, hosts, updated)
|
||||
self.bd_add_del(1, is_add=0)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main(testRunner=VppTestRunner)
|
||||
|
Reference in New Issue
Block a user