ip: add IPv6 ping test for link-layer address

Type: improvement

Change-Id: I9f60e29462c7cb193a8594b7de06418b40573103
Signed-off-by: Benoît Ganne <bganne@cisco.com>
This commit is contained in:
Benoît Ganne
2021-01-18 19:25:38 +01:00
committed by Neale Ranns
parent 58a1915b50
commit 2699fe2ba8
3 changed files with 43 additions and 42 deletions

View File

@ -1169,6 +1169,7 @@ class TestICMPv6Echo(VppTestCase):
for i in self.pg_interfaces:
i.admin_up()
i.config_ip6()
i.resolve_ndp(link_layer=True)
i.resolve_ndp()
def tearDown(self):
@ -1186,39 +1187,34 @@ class TestICMPv6Echo(VppTestCase):
- Check outgoing ICMPv6 Echo Reply message on pg0 interface.
"""
icmpv6_id = 0xb
icmpv6_seq = 5
icmpv6_data = b'\x0a' * 18
p_echo_request = (Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
IPv6(src=self.pg0.remote_ip6,
dst=self.pg0.local_ip6) /
ICMPv6EchoRequest(
id=icmpv6_id,
seq=icmpv6_seq,
data=icmpv6_data))
# test both with global and local ipv6 addresses
dsts = (self.pg0.local_ip6, self.pg0.local_ip6_ll)
id = 0xb
seq = 5
data = b'\x0a' * 18
p = list()
for dst in dsts:
p.append((Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IPv6(src=self.pg0.remote_ip6, dst=dst) /
ICMPv6EchoRequest(id=id, seq=seq, data=data)))
self.pg0.add_stream(p_echo_request)
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rxs = self.pg0.get_capture(len(dsts))
rx = self.pg0.get_capture(1)
rx = rx[0]
ether = rx[Ether]
ipv6 = rx[IPv6]
icmpv6 = rx[ICMPv6EchoReply]
self.assertEqual(ether.src, self.pg0.local_mac)
self.assertEqual(ether.dst, self.pg0.remote_mac)
self.assertEqual(ipv6.src, self.pg0.local_ip6)
self.assertEqual(ipv6.dst, self.pg0.remote_ip6)
self.assertEqual(
icmp6types[icmpv6.type], "Echo Reply")
self.assertEqual(icmpv6.id, icmpv6_id)
self.assertEqual(icmpv6.seq, icmpv6_seq)
self.assertEqual(icmpv6.data, icmpv6_data)
for rx, dst in zip(rxs, dsts):
ether = rx[Ether]
ipv6 = rx[IPv6]
icmpv6 = rx[ICMPv6EchoReply]
self.assertEqual(ether.src, self.pg0.local_mac)
self.assertEqual(ether.dst, self.pg0.remote_mac)
self.assertEqual(ipv6.src, dst)
self.assertEqual(ipv6.dst, self.pg0.remote_ip6)
self.assertEqual(icmp6types[icmpv6.type], "Echo Reply")
self.assertEqual(icmpv6.id, id)
self.assertEqual(icmpv6.seq, seq)
self.assertEqual(icmpv6.data, data)
class TestIPv6RD(TestIPv6ND):

View File

@ -442,14 +442,16 @@ class VppPGInterface(VppInterface):
ARP(op=ARP.who_has, pdst=self.local_ip4,
psrc=self.remote_ip4, hwsrc=self.remote_mac))
def create_ndp_req(self):
def create_ndp_req(self, addr=None):
"""Create NDP - NS applicable for this interface"""
nsma = in6_getnsma(inet_pton(socket.AF_INET6, self.local_ip6))
if not addr:
addr = self.local_ip6
nsma = in6_getnsma(inet_pton(socket.AF_INET6, addr))
d = inet_ntop(socket.AF_INET6, nsma)
return (Ether(dst=in6_getnsmac(nsma)) /
IPv6(dst=d, src=self.remote_ip6) /
ICMPv6ND_NS(tgt=self.local_ip6) /
ICMPv6ND_NS(tgt=addr) /
ICMPv6NDOptSrcLLAddr(lladdr=self.remote_mac))
def resolve_arp(self, pg_interface=None):
@ -488,19 +490,22 @@ class VppPGInterface(VppInterface):
ppp("Unexpected response to ARP request:", captured_packet))
raise
def resolve_ndp(self, pg_interface=None, timeout=1):
def resolve_ndp(self, pg_interface=None, timeout=1, link_layer=False):
"""Resolve NDP using provided packet-generator interface
:param pg_interface: interface used to resolve, if None then this
interface is used
:param timeout: how long to wait for response before giving up
:param link_layer: resolve for global address if False (default)
or for link-layer address if True
"""
if pg_interface is None:
pg_interface = self
addr = self.local_ip6_ll if link_layer else self.local_ip6
self.test.logger.info("Sending NDP request for %s on port %s" %
(self.local_ip6, pg_interface.name))
ndp_req = self.create_ndp_req()
(addr, pg_interface.name))
ndp_req = self.create_ndp_req(addr)
pg_interface.add_stream(ndp_req)
pg_interface.enable_capture()
self.test.pg_start()

View File

@ -58,7 +58,7 @@ class VppSubInterface(VppPGInterface, metaclass=abc.ABCMeta):
pass
@abc.abstractmethod
def create_ndp_req(self):
def create_ndp_req(self, addr=None):
pass
def resolve_arp(self):
@ -154,8 +154,8 @@ class VppDot1QSubint(VppSubInterface):
packet = VppPGInterface.create_arp_req(self)
return self.add_dot1_layer(packet)
def create_ndp_req(self):
packet = VppPGInterface.create_ndp_req(self)
def create_ndp_req(self, addr=None):
packet = VppPGInterface.create_ndp_req(self, addr)
return self.add_dot1_layer(packet)
# called before sending packet
@ -196,8 +196,8 @@ class VppDot1ADSubint(VppSubInterface):
packet = VppPGInterface.create_arp_req(self)
return self.add_dot1_layer(packet)
def create_ndp_req(self):
packet = VppPGInterface.create_ndp_req(self)
def create_ndp_req(self, addr=None):
packet = VppPGInterface.create_ndp_req(self, addr)
return self.add_dot1_layer(packet)
def add_dot1_layer(self, packet):
@ -228,6 +228,6 @@ class VppP2PSubint(VppSubInterface):
packet = VppPGInterface.create_arp_req(self)
return packet
def create_ndp_req(self):
packet = VppPGInterface.create_ndp_req(self)
def create_ndp_req(self, addr=None):
packet = VppPGInterface.create_ndp_req(self, addr)
return packet