bfd: UT for BFD session over GRE tunnel

Signed-off-by: Neale Ranns <nranns@cisco.com>

Adding UT for BFD over GRE tunnel.

Ticket: none
Type: test

Change-Id: I03eca4a0c7114cdc16fde9be4cd6db6216b8c845
Signed-off-by: Neale Ranns <nranns@cisco.com>
This commit is contained in:
Neale Ranns 2019-06-05 10:28:17 +00:00 committed by Damjan Marion
parent 718a055fa9
commit 52cd49616d
2 changed files with 113 additions and 14 deletions

View File

@ -228,7 +228,7 @@ class VppBFDUDPSession(VppObject):
def __init__(self, test, interface, peer_addr, local_addr=None, af=AF_INET,
desired_min_tx=300000, required_min_rx=300000, detect_mult=3,
sha1_key=None, bfd_key_id=None):
sha1_key=None, bfd_key_id=None, is_tunnel=False):
self._test = test
self._interface = interface
self._af = af
@ -247,6 +247,7 @@ class VppBFDUDPSession(VppObject):
self._bfd_key_id = bfd_key_id
else:
self._bfd_key_id = randint(0, 255)
self._is_tunnel = is_tunnel
@property
def test(self):
@ -348,6 +349,10 @@ class VppBFDUDPSession(VppObject):
""" bfd key id in use """
return self._bfd_key_id
@property
def is_tunnel(self):
return self._is_tunnel
def activate_auth(self, key, bfd_key_id=None, delayed=False):
""" activate authentication for this session """
self._bfd_key_id = bfd_key_id if bfd_key_id else randint(0, 255)

View File

@ -15,7 +15,7 @@ from six import moves
import scapy.compat
from scapy.layers.inet import UDP, IP
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether
from scapy.layers.l2 import Ether, GRE
from scapy.packet import Raw
from bfd import VppBFDAuthKey, BFD, BFDAuthType, VppBFDUDPSession, \
@ -27,6 +27,7 @@ from vpp_ip_route import VppIpRoute, VppRoutePath
from vpp_lo_interface import VppLoInterface
from vpp_papi_provider import UnexpectedApiReturnValueError
from vpp_pg_interface import CaptureTimeoutError, is_ipv6_misc
from vpp_gre_interface import VppGreInterface
USEC_IN_SEC = 1000000
@ -312,12 +313,17 @@ class BFDTestSession(object):
""" BFD session as seen from test framework side """
def __init__(self, test, interface, af, detect_mult=3, sha1_key=None,
bfd_key_id=None, our_seq_number=None):
bfd_key_id=None, our_seq_number=None,
tunnel_header=None, phy_interface=None):
self.test = test
self.af = af
self.sha1_key = sha1_key
self.bfd_key_id = bfd_key_id
self.interface = interface
if phy_interface:
self.phy_interface = phy_interface
else:
self.phy_interface = self.interface
self.udp_sport = randint(49152, 65535)
if our_seq_number is None:
self.our_seq_number = randint(0, 40000000)
@ -333,6 +339,7 @@ class BFDTestSession(object):
self.your_discriminator = None
self.state = BFDState.down
self.auth_type = BFDAuthType.no_auth
self.tunnel_header = tunnel_header
def inc_seq_num(self):
""" increment sequence number, wrapping if needed """
@ -418,17 +425,19 @@ class BFDTestSession(object):
bfd.length = BFD.sha1_auth_len + BFD.bfd_pkt_len
else:
bfd = BFD()
packet = Ether(src=self.phy_interface.remote_mac,
dst=self.phy_interface.local_mac)
if self.tunnel_header:
packet = packet / self.tunnel_header
if self.af == AF_INET6:
packet = (Ether(src=self.interface.remote_mac,
dst=self.interface.local_mac) /
packet = (packet /
IPv6(src=self.interface.remote_ip6,
dst=self.interface.local_ip6,
hlim=255) /
UDP(sport=self.udp_sport, dport=BFD.udp_dport) /
bfd)
else:
packet = (Ether(src=self.interface.remote_mac,
dst=self.interface.local_mac) /
packet = (packet /
IP(src=self.interface.remote_ip4,
dst=self.interface.local_ip4,
ttl=255) /
@ -450,7 +459,7 @@ class BFDTestSession(object):
if packet is None:
packet = self.create_packet()
if interface is None:
interface = self.test.pg0
interface = self.phy_interface
self.test.logger.debug(ppp("Sending packet:", packet))
interface.add_stream(packet)
self.test.pg_start()
@ -514,7 +523,7 @@ class BFDTestSession(object):
def bfd_session_up(test):
""" Bring BFD session up """
test.logger.info("BFD: Waiting for slow hello")
p = wait_for_bfd_packet(test, 2)
p = wait_for_bfd_packet(test, 2, is_tunnel=test.vpp_session.is_tunnel)
old_offset = None
if hasattr(test, 'vpp_clock_offset'):
old_offset = test.vpp_clock_offset
@ -589,13 +598,13 @@ def verify_ip(test, packet):
""" Verify correctness of IP layer. """
if test.vpp_session.af == AF_INET6:
ip = packet[IPv6]
local_ip = test.pg0.local_ip6
remote_ip = test.pg0.remote_ip6
local_ip = test.vpp_session.interface.local_ip6
remote_ip = test.vpp_session.interface.remote_ip6
test.assert_equal(ip.hlim, 255, "IPv6 hop limit")
else:
ip = packet[IP]
local_ip = test.pg0.local_ip4
remote_ip = test.pg0.remote_ip4
local_ip = test.vpp_session.interface.local_ip4
remote_ip = test.vpp_session.interface.remote_ip4
test.assert_equal(ip.ttl, 255, "IPv4 TTL")
test.assert_equal(ip.src, local_ip, "IP source address")
test.assert_equal(ip.dst, remote_ip, "IP destination address")
@ -633,7 +642,7 @@ def verify_event(test, event, expected_state):
test.assert_equal(e.state, expected_state, BFDState)
def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None, is_tunnel=False):
""" wait for BFD packet and verify its correctness
:param timeout: how long to wait
@ -659,6 +668,10 @@ def wait_for_bfd_packet(test, timeout=1, pcap_time_min=None):
(p.time, pcap_time_min), p))
else:
break
if is_tunnel:
# strip an IP layer and move to the next
p = p[IP].payload
bfd = p[BFD]
if bfd is None:
raise Exception(ppp("Unexpected or invalid BFD packet:", p))
@ -1804,6 +1817,87 @@ class BFDFIBTestCase(VppTestCase):
packet[IPv6].dst)
@unittest.skipUnless(running_extended_tests, "part of extended tests")
class BFDTunTestCase(VppTestCase):
""" BFD over GRE tunnel """
vpp_session = None
test_session = None
@classmethod
def setUpClass(cls):
super(BFDTunTestCase, cls).setUpClass()
@classmethod
def tearDownClass(cls):
super(BFDTunTestCase, cls).tearDownClass()
def setUp(self):
super(BFDTunTestCase, self).setUp()
self.create_pg_interfaces(range(1))
self.vapi.want_bfd_events()
self.pg0.enable_capture()
for i in self.pg_interfaces:
i.admin_up()
i.config_ip4()
i.resolve_arp()
def tearDown(self):
if not self.vpp_dead:
self.vapi.want_bfd_events(enable_disable=0)
super(BFDTunTestCase, self).tearDown()
@staticmethod
def pkt_is_not_data_traffic(p):
""" not data traffic implies BFD or the usual IPv6 ND/RA"""
if p.haslayer(BFD) or is_ipv6_misc(p):
return True
return False
def test_bfd_o_gre(self):
""" BFD-o-GRE """
# A GRE interface over which to run a BFD session
gre_if = VppGreInterface(self,
self.pg0.local_ip4,
self.pg0.remote_ip4)
gre_if.add_vpp_config()
gre_if.admin_up()
gre_if.config_ip4()
# bring the session up now the routes are present
self.vpp_session = VppBFDUDPSession(self,
gre_if,
gre_if.remote_ip4,
is_tunnel=True)
self.vpp_session.add_vpp_config()
self.vpp_session.admin_up()
self.test_session = BFDTestSession(
self, gre_if, AF_INET,
tunnel_header=(IP(src=self.pg0.remote_ip4,
dst=self.pg0.local_ip4) /
GRE()),
phy_interface=self.pg0)
# packets to match against both of the routes
p = [(Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
IP(src=self.pg0.remote_ip4, dst=gre_if.remote_ip4) /
UDP(sport=1234, dport=1234) /
Raw('\xa5' * 100))]
# session is up - traffic passes
bfd_session_up(self)
self.send_and_expect(self.pg0, p, self.pg0)
# bring session down
bfd_session_down(self)
@unittest.skipUnless(running_extended_tests, "part of extended tests")
class BFDSHA1TestCase(VppTestCase):
"""Bidirectional Forwarding Detection (BFD) (SHA1 auth) """