d9b0c6fbf7
Drop pycodestyle for code style checking in favor of black. Black is much faster, stable PEP8 compliant code style checker offering also automatic formatting. It aims to be very stable and produce smallest diffs. It's used by many small and big projects. Running checkstyle with black takes a few seconds with a terse output. Thus, test-checkstyle-diff is no longer necessary. Expand scope of checkstyle to all python files in the repo, replacing test-checkstyle with checkstyle-python. Also, fixstyle-python is now available for automatic style formatting. Note: python virtualenv has been consolidated in test/Makefile, test/requirements*.txt which will eventually be moved to a central location. This is required to simply the automated generation of docker executor images in the CI. Type: improvement Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8 Signed-off-by: Klement Sekera <klement.sekera@gmail.com> Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
1531 lines
51 KiB
Python
1531 lines
51 KiB
Python
#!/usr/bin/env python3
|
|
|
|
#
|
|
# Copyright 2019-2020 Rubicon Communications, LLC (Netgate)
|
|
#
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
#
|
|
|
|
import unittest
|
|
import time
|
|
import socket
|
|
from socket import inet_pton, inet_ntop
|
|
|
|
from vpp_object import VppObject
|
|
from vpp_papi import VppEnum
|
|
|
|
from scapy.packet import raw
|
|
from scapy.layers.l2 import Ether, ARP
|
|
from scapy.layers.inet import IP, ICMP, icmptypes
|
|
from scapy.layers.inet6 import (
|
|
IPv6,
|
|
ipv6nh,
|
|
IPv6ExtHdrHopByHop,
|
|
ICMPv6MLReport2,
|
|
ICMPv6ND_NA,
|
|
ICMPv6ND_NS,
|
|
ICMPv6NDOptDstLLAddr,
|
|
ICMPv6NDOptSrcLLAddr,
|
|
ICMPv6EchoRequest,
|
|
ICMPv6EchoReply,
|
|
)
|
|
from scapy.contrib.igmpv3 import IGMPv3, IGMPv3mr, IGMPv3gr
|
|
from scapy.layers.vrrp import IPPROTO_VRRP, VRRPv3
|
|
from scapy.utils6 import in6_getnsma, in6_getnsmac
|
|
from config import config
|
|
from framework import VppTestCase, VppTestRunner
|
|
from util import ip6_normalize
|
|
|
|
VRRP_VR_FLAG_PREEMPT = 1
|
|
VRRP_VR_FLAG_ACCEPT = 2
|
|
VRRP_VR_FLAG_UNICAST = 4
|
|
VRRP_VR_FLAG_IPV6 = 8
|
|
|
|
VRRP_VR_STATE_INIT = 0
|
|
VRRP_VR_STATE_BACKUP = 1
|
|
VRRP_VR_STATE_MASTER = 2
|
|
VRRP_VR_STATE_INTF_DOWN = 3
|
|
|
|
VRRP_INDEX_INVALID = 0xFFFFFFFF
|
|
|
|
|
|
def is_non_arp(p):
|
|
"""Want to filter out advertisements, igmp, etc"""
|
|
if p.haslayer(ARP):
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def is_not_adv(p):
|
|
"""Filter out everything but advertisements. E.g. multicast RD/ND"""
|
|
if p.haslayer(VRRPv3):
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def is_not_echo_reply(p):
|
|
"""filter out advertisements and other while waiting for echo reply"""
|
|
if p.haslayer(IP) and p.haslayer(ICMP):
|
|
if icmptypes[p[ICMP].type] == "echo-reply":
|
|
return False
|
|
elif p.haslayer(IPv6) and p.haslayer(ICMPv6EchoReply):
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
class VppVRRPVirtualRouter(VppObject):
|
|
def __init__(
|
|
self,
|
|
test,
|
|
intf,
|
|
vr_id,
|
|
prio=100,
|
|
intvl=100,
|
|
flags=VRRP_VR_FLAG_PREEMPT,
|
|
vips=None,
|
|
):
|
|
self._test = test
|
|
self._intf = intf
|
|
self._sw_if_index = self._intf.sw_if_index
|
|
self._vr_id = vr_id
|
|
self._prio = prio
|
|
self._intvl = intvl
|
|
self._flags = flags
|
|
if flags & VRRP_VR_FLAG_IPV6:
|
|
self._is_ipv6 = 1
|
|
self._adv_dest_mac = "33:33:00:00:00:12"
|
|
self._virtual_mac = "00:00:5e:00:02:%02x" % vr_id
|
|
self._adv_dest_ip = "ff02::12"
|
|
self._vips = [intf.local_ip6] if vips is None else vips
|
|
else:
|
|
self._is_ipv6 = 0
|
|
self._adv_dest_mac = "01:00:5e:00:00:12"
|
|
self._virtual_mac = "00:00:5e:00:01:%02x" % vr_id
|
|
self._adv_dest_ip = "224.0.0.18"
|
|
self._vips = [intf.local_ip4] if vips is None else vips
|
|
self._tracked_ifs = []
|
|
self._vrrp_index = VRRP_INDEX_INVALID
|
|
|
|
def add_vpp_config(self):
|
|
self._test.vapi.vrrp_vr_add_del(
|
|
is_add=1,
|
|
sw_if_index=self._intf.sw_if_index,
|
|
vr_id=self._vr_id,
|
|
priority=self._prio,
|
|
interval=self._intvl,
|
|
flags=self._flags,
|
|
n_addrs=len(self._vips),
|
|
addrs=self._vips,
|
|
)
|
|
|
|
def update_vpp_config(self):
|
|
r = self._test.vapi.vrrp_vr_update(
|
|
vrrp_index=self._vrrp_index,
|
|
sw_if_index=self._intf.sw_if_index,
|
|
vr_id=self._vr_id,
|
|
priority=self._prio,
|
|
interval=self._intvl,
|
|
flags=self._flags,
|
|
n_addrs=len(self._vips),
|
|
addrs=self._vips,
|
|
)
|
|
self._vrrp_index = r.vrrp_index
|
|
|
|
def delete_vpp_config(self):
|
|
self._test.vapi.vrrp_vr_del(vrrp_index=self._vrrp_index)
|
|
|
|
def query_vpp_config(self):
|
|
vrs = self._test.vapi.vrrp_vr_dump(sw_if_index=self._intf.sw_if_index)
|
|
for vr in vrs:
|
|
if vr.config.vr_id != self._vr_id:
|
|
continue
|
|
|
|
is_ipv6 = 1 if (vr.config.flags & VRRP_VR_FLAG_IPV6) else 0
|
|
if is_ipv6 != self._is_ipv6:
|
|
continue
|
|
|
|
return vr
|
|
|
|
return None
|
|
|
|
def remove_vpp_config(self):
|
|
self._test.vapi.vrrp_vr_add_del(
|
|
is_add=0,
|
|
sw_if_index=self._intf.sw_if_index,
|
|
vr_id=self._vr_id,
|
|
priority=self._prio,
|
|
interval=self._intvl,
|
|
flags=self._flags,
|
|
n_addrs=len(self._vips),
|
|
addrs=self._vips,
|
|
)
|
|
|
|
def start_stop(self, is_start):
|
|
self._test.vapi.vrrp_vr_start_stop(
|
|
is_start=is_start,
|
|
sw_if_index=self._intf.sw_if_index,
|
|
vr_id=self._vr_id,
|
|
is_ipv6=self._is_ipv6,
|
|
)
|
|
self._start_time = time.time() if is_start else None
|
|
|
|
def add_del_tracked_interface(self, is_add, sw_if_index, prio):
|
|
args = {
|
|
"sw_if_index": self._intf.sw_if_index,
|
|
"is_ipv6": self._is_ipv6,
|
|
"vr_id": self._vr_id,
|
|
"is_add": is_add,
|
|
"n_ifs": 1,
|
|
"ifs": [{"sw_if_index": sw_if_index, "priority": prio}],
|
|
}
|
|
self._test.vapi.vrrp_vr_track_if_add_del(**args)
|
|
self._tracked_ifs.append(args["ifs"][0])
|
|
|
|
def set_unicast_peers(self, addrs):
|
|
args = {
|
|
"sw_if_index": self._intf.sw_if_index,
|
|
"is_ipv6": self._is_ipv6,
|
|
"vr_id": self._vr_id,
|
|
"n_addrs": len(addrs),
|
|
"addrs": addrs,
|
|
}
|
|
self._test.vapi.vrrp_vr_set_peers(**args)
|
|
self._unicast_peers = addrs
|
|
|
|
def start_time(self):
|
|
return self._start_time
|
|
|
|
def virtual_mac(self):
|
|
return self._virtual_mac
|
|
|
|
def virtual_ips(self):
|
|
return self._vips
|
|
|
|
def adv_dest_mac(self):
|
|
return self._adv_dest_mac
|
|
|
|
def adv_dest_ip(self):
|
|
return self._adv_dest_ip
|
|
|
|
def priority(self):
|
|
return self._prio
|
|
|
|
def vr_id(self):
|
|
return self._vr_id
|
|
|
|
def adv_interval(self):
|
|
return self._intvl
|
|
|
|
def interface(self):
|
|
return self._intf
|
|
|
|
def assert_state_equals(self, state):
|
|
vr_details = self.query_vpp_config()
|
|
self._test.assertEqual(vr_details.runtime.state, state)
|
|
|
|
def master_down_seconds(self):
|
|
vr_details = self.query_vpp_config()
|
|
return vr_details.runtime.master_down_int * 0.01
|
|
|
|
def vrrp_adv_packet(self, prio=None, src_ip=None):
|
|
dst_ip = self._adv_dest_ip
|
|
if prio is None:
|
|
prio = self._prio
|
|
eth = Ether(dst=self._adv_dest_mac, src=self._virtual_mac)
|
|
vrrp = VRRPv3(
|
|
vrid=self._vr_id, priority=prio, ipcount=len(self._vips), adv=self._intvl
|
|
)
|
|
if self._is_ipv6:
|
|
src_ip = self._intf.local_ip6_ll if src_ip is None else src_ip
|
|
ip = IPv6(src=src_ip, dst=dst_ip, nh=IPPROTO_VRRP, hlim=255)
|
|
vrrp.addrlist = self._vips
|
|
else:
|
|
src_ip = self._intf.local_ip4 if src_ip is None else src_ip
|
|
ip = IP(src=src_ip, dst=dst_ip, proto=IPPROTO_VRRP, ttl=255, id=0)
|
|
vrrp.addrlist = self._vips
|
|
|
|
# Fill in default values & checksums
|
|
pkt = Ether(raw(eth / ip / vrrp))
|
|
return pkt
|
|
|
|
|
|
class TestVRRP4(VppTestCase):
|
|
"""IPv4 VRRP Test Case"""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(TestVRRP4, cls).setUpClass()
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
super(TestVRRP4, cls).tearDownClass()
|
|
|
|
def setUp(self):
|
|
super(TestVRRP4, self).setUp()
|
|
|
|
self.create_pg_interfaces(range(2))
|
|
|
|
for i in self.pg_interfaces:
|
|
i.admin_up()
|
|
i.config_ip4()
|
|
i.generate_remote_hosts(5)
|
|
i.configure_ipv4_neighbors()
|
|
|
|
self._vrs = []
|
|
self._default_flags = VRRP_VR_FLAG_PREEMPT
|
|
self._default_adv = 100
|
|
|
|
def tearDown(self):
|
|
for vr in self._vrs:
|
|
try:
|
|
vr_api = vr.query_vpp_config()
|
|
if vr_api.runtime.state != VRRP_VR_STATE_INIT:
|
|
vr.start_stop(is_start=0)
|
|
vr.remove_vpp_config()
|
|
except:
|
|
self.logger.error("Error cleaning up")
|
|
|
|
for i in self.pg_interfaces:
|
|
i.admin_down()
|
|
i.unconfig_ip4()
|
|
i.unconfig_ip6()
|
|
|
|
self._vrs = []
|
|
|
|
super(TestVRRP4, self).tearDown()
|
|
|
|
def verify_vrrp4_igmp(self, pkt):
|
|
ip = pkt[IP]
|
|
self.assertEqual(ip.dst, "224.0.0.22")
|
|
self.assertEqual(ip.proto, 2)
|
|
|
|
igmp = pkt[IGMPv3]
|
|
self.assertEqual(IGMPv3.igmpv3types[igmp.type], "Version 3 Membership Report")
|
|
|
|
igmpmr = pkt[IGMPv3mr]
|
|
self.assertEqual(igmpmr.numgrp, 1)
|
|
self.assertEqual(igmpmr.records[0].maddr, "224.0.0.18")
|
|
|
|
def verify_vrrp4_garp(self, pkt, vip, vmac):
|
|
arp = pkt[ARP]
|
|
|
|
# ARP "who-has" op == 1
|
|
self.assertEqual(arp.op, 1)
|
|
self.assertEqual(arp.pdst, arp.psrc)
|
|
self.assertEqual(arp.pdst, vip)
|
|
self.assertEqual(arp.hwsrc, vmac)
|
|
|
|
def verify_vrrp4_adv(self, rx_pkt, vr, prio=None):
|
|
vips = vr.virtual_ips()
|
|
eth = rx_pkt[Ether]
|
|
ip = rx_pkt[IP]
|
|
vrrp = rx_pkt[VRRPv3]
|
|
|
|
pkt = vr.vrrp_adv_packet(prio=prio)
|
|
|
|
# Source MAC is virtual MAC, destination is multicast MAC
|
|
self.assertEqual(eth.src, vr.virtual_mac())
|
|
self.assertEqual(eth.dst, vr.adv_dest_mac())
|
|
|
|
self.assertEqual(ip.dst, "224.0.0.18")
|
|
self.assertEqual(ip.ttl, 255)
|
|
self.assertEqual(ip.proto, IPPROTO_VRRP)
|
|
|
|
self.assertEqual(vrrp.version, 3)
|
|
self.assertEqual(vrrp.type, 1)
|
|
self.assertEqual(vrrp.vrid, vr.vr_id())
|
|
if prio is None:
|
|
prio = vr.priority()
|
|
self.assertEqual(vrrp.priority, prio)
|
|
self.assertEqual(vrrp.ipcount, len(vips))
|
|
self.assertEqual(vrrp.adv, vr.adv_interval())
|
|
self.assertListEqual(vrrp.addrlist, vips)
|
|
|
|
# VR with priority 255 owns the virtual address and should
|
|
# become master and start advertising immediately.
|
|
@unittest.skipUnless(config.extended, "part of extended tests")
|
|
def test_vrrp4_master_adv(self):
|
|
"""IPv4 Master VR advertises"""
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
|
|
prio = 255
|
|
intvl = self._default_adv
|
|
vr = VppVRRPVirtualRouter(
|
|
self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
|
|
)
|
|
|
|
vr.add_vpp_config()
|
|
vr.start_stop(is_start=1)
|
|
self.logger.info(self.vapi.cli("show vrrp vr"))
|
|
vr.start_stop(is_start=0)
|
|
self.logger.info(self.vapi.cli("show vrrp vr"))
|
|
|
|
pkts = self.pg0.get_capture(4)
|
|
|
|
# Init -> Master: IGMP Join, VRRP adv, gratuitous ARP are sent
|
|
self.verify_vrrp4_igmp(pkts[0])
|
|
self.verify_vrrp4_adv(pkts[1], vr, prio=prio)
|
|
self.verify_vrrp4_garp(pkts[2], vr.virtual_ips()[0], vr.virtual_mac())
|
|
# Master -> Init: Adv with priority 0 sent to force an election
|
|
self.verify_vrrp4_adv(pkts[3], vr, prio=0)
|
|
|
|
vr.remove_vpp_config()
|
|
self._vrs = []
|
|
|
|
# Same as above but with the update API, and add a change
|
|
# of parameters to test that too
|
|
@unittest.skipUnless(config.extended, "part of extended tests")
|
|
def test_vrrp4_master_adv_update(self):
|
|
"""IPv4 Master VR adv + Update to Backup"""
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
|
|
prio = 255
|
|
intvl = self._default_adv
|
|
vr = VppVRRPVirtualRouter(
|
|
self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
|
|
)
|
|
|
|
vr.update_vpp_config()
|
|
vr.start_stop(is_start=1)
|
|
self.logger.info(self.vapi.cli("show vrrp vr"))
|
|
# Update VR with lower prio and larger interval
|
|
# we need to keep old VR for the adv checks
|
|
upd_vr = VppVRRPVirtualRouter(
|
|
self,
|
|
self.pg0,
|
|
100,
|
|
prio=100,
|
|
intvl=2 * intvl,
|
|
flags=self._default_flags,
|
|
vips=[self.pg0.remote_ip4],
|
|
)
|
|
upd_vr._vrrp_index = vr._vrrp_index
|
|
upd_vr.update_vpp_config()
|
|
start_time = time.time()
|
|
self.logger.info(self.vapi.cli("show vrrp vr"))
|
|
upd_vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
|
|
self._vrs = [upd_vr]
|
|
|
|
pkts = self.pg0.get_capture(5)
|
|
# Init -> Master: IGMP Join, VRRP adv, gratuitous ARP are sent
|
|
self.verify_vrrp4_igmp(pkts[0])
|
|
self.verify_vrrp4_adv(pkts[1], vr, prio=prio)
|
|
self.verify_vrrp4_garp(pkts[2], vr.virtual_ips()[0], vr.virtual_mac())
|
|
# Master -> Init: Adv with priority 0 sent to force an election
|
|
self.verify_vrrp4_adv(pkts[3], vr, prio=0)
|
|
# Init -> Backup: An IGMP join should be sent
|
|
self.verify_vrrp4_igmp(pkts[4])
|
|
|
|
# send higher prio advertisements, should not receive any
|
|
end_time = start_time + 2 * upd_vr.master_down_seconds()
|
|
src_ip = self.pg0.remote_ip4
|
|
pkts = [upd_vr.vrrp_adv_packet(prio=110, src_ip=src_ip)]
|
|
while time.time() < end_time:
|
|
self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl * 0.01)
|
|
self.logger.info(self.vapi.cli("show trace"))
|
|
|
|
upd_vr.start_stop(is_start=0)
|
|
self.logger.info(self.vapi.cli("show vrrp vr"))
|
|
|
|
# VR with priority < 255 enters backup state and does not advertise as
|
|
# long as it receives higher priority advertisements
|
|
@unittest.skipUnless(config.extended, "part of extended tests")
|
|
def test_vrrp4_backup_noadv(self):
|
|
"""IPv4 Backup VR does not advertise"""
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
|
|
vr_id = 100
|
|
prio = 100
|
|
intvl = self._default_adv
|
|
intvl_s = intvl * 0.01
|
|
vr = VppVRRPVirtualRouter(
|
|
self,
|
|
self.pg0,
|
|
vr_id,
|
|
prio=prio,
|
|
intvl=intvl,
|
|
flags=self._default_flags,
|
|
vips=[self.pg0.remote_ip4],
|
|
)
|
|
self._vrs.append(vr)
|
|
vr.add_vpp_config()
|
|
|
|
vr.start_stop(is_start=1)
|
|
|
|
vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
|
|
# watch for advertisements for 2x the master down preemption timeout
|
|
end_time = vr.start_time() + 2 * vr.master_down_seconds()
|
|
|
|
# Init -> Backup: An IGMP join should be sent
|
|
pkts = self.pg0.get_capture(1)
|
|
self.verify_vrrp4_igmp(pkts[0])
|
|
|
|
# send higher prio advertisements, should not receive any
|
|
src_ip = self.pg0.remote_ip4
|
|
pkts = [vr.vrrp_adv_packet(prio=prio + 10, src_ip=src_ip)]
|
|
while time.time() < end_time:
|
|
self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
|
|
self.logger.info(self.vapi.cli("show trace"))
|
|
|
|
vr.start_stop(is_start=0)
|
|
self.logger.info(self.vapi.cli("show vrrp vr"))
|
|
vr.remove_vpp_config()
|
|
self._vrs = []
|
|
|
|
def test_vrrp4_master_arp(self):
|
|
"""IPv4 Master VR replies to ARP"""
|
|
self.pg_start()
|
|
|
|
# VR virtual IP is the default, which is the pg local IP
|
|
vr_id = 100
|
|
prio = 255
|
|
intvl = self._default_adv
|
|
vr = VppVRRPVirtualRouter(
|
|
self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
|
|
)
|
|
self._vrs.append(vr)
|
|
|
|
vr.add_vpp_config()
|
|
|
|
# before the VR is up, ARP should resolve to interface MAC
|
|
self.pg0.resolve_arp()
|
|
self.assertNotEqual(self.pg0.local_mac, vr.virtual_mac())
|
|
|
|
# start the VR, ARP should now resolve to virtual MAC
|
|
vr.start_stop(is_start=1)
|
|
self.pg0.resolve_arp()
|
|
self.assertEqual(self.pg0.local_mac, vr.virtual_mac())
|
|
|
|
# stop the VR, ARP should resolve to interface MAC again
|
|
vr.start_stop(is_start=0)
|
|
self.pg0.resolve_arp()
|
|
self.assertNotEqual(self.pg0.local_mac, vr.virtual_mac())
|
|
|
|
vr.remove_vpp_config()
|
|
self._vrs = []
|
|
|
|
@unittest.skipUnless(config.extended, "part of extended tests")
|
|
def test_vrrp4_backup_noarp(self):
|
|
"""IPv4 Backup VR ignores ARP"""
|
|
# We need an address for a virtual IP that is not the IP that
|
|
# ARP requests will originate from
|
|
|
|
vr_id = 100
|
|
prio = 100
|
|
intvl = self._default_adv
|
|
vip = self.pg0.remote_hosts[1].ip4
|
|
vr = VppVRRPVirtualRouter(
|
|
self,
|
|
self.pg0,
|
|
vr_id,
|
|
prio=prio,
|
|
intvl=intvl,
|
|
flags=self._default_flags,
|
|
vips=[vip],
|
|
)
|
|
self._vrs.append(vr)
|
|
vr.add_vpp_config()
|
|
|
|
arp_req = Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) / ARP(
|
|
op=ARP.who_has,
|
|
pdst=vip,
|
|
psrc=self.pg0.remote_ip4,
|
|
hwsrc=self.pg0.remote_mac,
|
|
)
|
|
|
|
# Before the VR is started make sure no reply to request for VIP
|
|
self.pg_start()
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.send_and_assert_no_replies(self.pg0, [arp_req], timeout=1)
|
|
|
|
# VR should start in backup state and still should not reply to ARP
|
|
# send a higher priority adv to make sure it does not become master
|
|
adv = vr.vrrp_adv_packet(prio=prio + 10, src_ip=self.pg0.remote_ip4)
|
|
vr.start_stop(is_start=1)
|
|
self.send_and_assert_no_replies(self.pg0, [adv, arp_req], timeout=1)
|
|
|
|
vr.start_stop(is_start=0)
|
|
vr.remove_vpp_config()
|
|
self._vrs = []
|
|
|
|
@unittest.skipUnless(config.extended, "part of extended tests")
|
|
def test_vrrp4_election(self):
|
|
"""IPv4 Backup VR becomes master if no advertisements received"""
|
|
|
|
vr_id = 100
|
|
prio = 100
|
|
intvl = self._default_adv
|
|
intvl_s = intvl * 0.01
|
|
vip = self.pg0.remote_ip4
|
|
vr = VppVRRPVirtualRouter(
|
|
self,
|
|
self.pg0,
|
|
vr_id,
|
|
prio=prio,
|
|
intvl=intvl,
|
|
flags=self._default_flags,
|
|
vips=[vip],
|
|
)
|
|
self._vrs.append(vr)
|
|
vr.add_vpp_config()
|
|
|
|
# After adding the VR, it should be in the init state
|
|
vr.assert_state_equals(VRRP_VR_STATE_INIT)
|
|
|
|
self.pg_start()
|
|
vr.start_stop(is_start=1)
|
|
|
|
# VR should be in backup state after starting
|
|
vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
|
|
end_time = vr.start_time() + vr.master_down_seconds()
|
|
|
|
# should not receive adverts until timer expires & state transition
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
while (time.time() + intvl_s) < end_time:
|
|
time.sleep(intvl_s)
|
|
self.pg0.assert_nothing_captured(filter_out_fn=is_not_adv)
|
|
|
|
# VR should be in master state, should send an adv
|
|
self.pg0.enable_capture()
|
|
self.pg0.wait_for_packet(intvl_s, is_not_adv)
|
|
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
|
|
|
|
@unittest.skipUnless(config.extended, "part of extended tests")
|
|
def test_vrrp4_backup_preempts(self):
|
|
"""IPv4 Backup VR preempts lower priority master"""
|
|
|
|
vr_id = 100
|
|
prio = 100
|
|
intvl = self._default_adv
|
|
intvl_s = intvl * 0.01
|
|
vip = self.pg0.remote_ip4
|
|
vr = VppVRRPVirtualRouter(
|
|
self,
|
|
self.pg0,
|
|
vr_id,
|
|
prio=prio,
|
|
intvl=intvl,
|
|
flags=self._default_flags,
|
|
vips=[vip],
|
|
)
|
|
self._vrs.append(vr)
|
|
vr.add_vpp_config()
|
|
|
|
# After adding the VR, it should be in the init state
|
|
vr.assert_state_equals(VRRP_VR_STATE_INIT)
|
|
|
|
self.pg_start()
|
|
vr.start_stop(is_start=1)
|
|
|
|
# VR should be in backup state after starting
|
|
vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
|
|
end_time = vr.start_time() + vr.master_down_seconds()
|
|
|
|
# send lower prio advertisements until timer expires
|
|
src_ip = self.pg0.remote_ip4
|
|
pkts = [vr.vrrp_adv_packet(prio=prio - 10, src_ip=src_ip)]
|
|
while time.time() + intvl_s < end_time:
|
|
self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
|
|
self.logger.info(self.vapi.cli("show trace"))
|
|
|
|
# when timer expires, VR should take over as master
|
|
self.pg0.enable_capture()
|
|
self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
|
|
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
|
|
|
|
@unittest.skipUnless(config.extended, "part of extended tests")
|
|
def test_vrrp4_master_preempted(self):
|
|
"""IPv4 Master VR preempted by higher priority backup"""
|
|
|
|
# A prio 255 VR cannot be preempted so the prio has to be lower and
|
|
# we have to wait for it to take over
|
|
vr_id = 100
|
|
prio = 100
|
|
intvl = self._default_adv
|
|
vip = self.pg0.remote_ip4
|
|
vr = VppVRRPVirtualRouter(
|
|
self,
|
|
self.pg0,
|
|
vr_id,
|
|
prio=prio,
|
|
intvl=intvl,
|
|
flags=self._default_flags,
|
|
vips=[vip],
|
|
)
|
|
self._vrs.append(vr)
|
|
vr.add_vpp_config()
|
|
|
|
# After adding the VR, it should be in the init state
|
|
vr.assert_state_equals(VRRP_VR_STATE_INIT)
|
|
|
|
# start VR
|
|
vr.start_stop(is_start=1)
|
|
vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
|
|
|
|
# wait for VR to take over as master
|
|
end_time = vr.start_time() + vr.master_down_seconds()
|
|
sleep_s = end_time - time.time()
|
|
time.sleep(sleep_s)
|
|
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
|
|
|
|
# Build advertisement packet and send it
|
|
pkts = [vr.vrrp_adv_packet(prio=255, src_ip=self.pg0.remote_ip4)]
|
|
self.pg_send(self.pg0, pkts)
|
|
|
|
# VR should be in backup state again
|
|
vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
|
|
|
|
@unittest.skipUnless(config.extended, "part of extended tests")
|
|
def test_vrrp4_accept_mode_disabled(self):
|
|
"""IPv4 Master VR does not reply for VIP w/ accept mode off"""
|
|
|
|
# accept mode only matters when prio < 255, so it will have to
|
|
# come up as a backup and take over as master after the timeout
|
|
vr_id = 100
|
|
prio = 100
|
|
intvl = self._default_adv
|
|
vip = self.pg0.remote_hosts[4].ip4
|
|
vr = VppVRRPVirtualRouter(
|
|
self,
|
|
self.pg0,
|
|
vr_id,
|
|
prio=prio,
|
|
intvl=intvl,
|
|
flags=self._default_flags,
|
|
vips=[vip],
|
|
)
|
|
self._vrs.append(vr)
|
|
vr.add_vpp_config()
|
|
|
|
# After adding the VR, it should be in the init state
|
|
vr.assert_state_equals(VRRP_VR_STATE_INIT)
|
|
|
|
# start VR
|
|
vr.start_stop(is_start=1)
|
|
vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
|
|
|
|
# wait for VR to take over as master
|
|
end_time = vr.start_time() + vr.master_down_seconds()
|
|
sleep_s = end_time - time.time()
|
|
time.sleep(sleep_s)
|
|
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
|
|
|
|
# send an ICMP echo to the VR virtual IP address
|
|
echo = (
|
|
Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac)
|
|
/ IP(dst=vip, src=self.pg0.remote_ip4)
|
|
/ ICMP(seq=1, id=self.pg0.sw_if_index, type="echo-request")
|
|
)
|
|
self.pg_send(self.pg0, [echo])
|
|
|
|
# wait for an echo reply. none should be received
|
|
time.sleep(1)
|
|
self.pg0.assert_nothing_captured(filter_out_fn=is_not_echo_reply)
|
|
|
|
@unittest.skipUnless(config.extended, "part of extended tests")
|
|
def test_vrrp4_accept_mode_enabled(self):
|
|
"""IPv4 Master VR replies for VIP w/ accept mode on"""
|
|
|
|
# A prio 255 VR cannot be preempted so the prio has to be lower and
|
|
# we have to wait for it to take over
|
|
vr_id = 100
|
|
prio = 100
|
|
intvl = self._default_adv
|
|
vip = self.pg0.remote_hosts[4].ip4
|
|
flags = VRRP_VR_FLAG_PREEMPT | VRRP_VR_FLAG_ACCEPT
|
|
vr = VppVRRPVirtualRouter(
|
|
self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip]
|
|
)
|
|
self._vrs.append(vr)
|
|
vr.add_vpp_config()
|
|
|
|
# After adding the VR, it should be in the init state
|
|
vr.assert_state_equals(VRRP_VR_STATE_INIT)
|
|
|
|
# start VR
|
|
vr.start_stop(is_start=1)
|
|
vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
|
|
|
|
# wait for VR to take over as master
|
|
end_time = vr.start_time() + vr.master_down_seconds()
|
|
sleep_s = end_time - time.time()
|
|
time.sleep(sleep_s)
|
|
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
|
|
|
|
# send an ICMP echo to the VR virtual IP address
|
|
echo = (
|
|
Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac)
|
|
/ IP(dst=vip, src=self.pg0.remote_ip4)
|
|
/ ICMP(seq=1, id=self.pg0.sw_if_index, type="echo-request")
|
|
)
|
|
self.pg_send(self.pg0, [echo])
|
|
|
|
# wait for an echo reply.
|
|
time.sleep(1)
|
|
rx_pkts = self.pg0.get_capture(
|
|
expected_count=1, timeout=1, filter_out_fn=is_not_echo_reply
|
|
)
|
|
|
|
self.assertEqual(rx_pkts[0][IP].src, vip)
|
|
self.assertEqual(rx_pkts[0][IP].dst, self.pg0.remote_ip4)
|
|
self.assertEqual(icmptypes[rx_pkts[0][ICMP].type], "echo-reply")
|
|
self.assertEqual(rx_pkts[0][ICMP].seq, 1)
|
|
self.assertEqual(rx_pkts[0][ICMP].id, self.pg0.sw_if_index)
|
|
|
|
@unittest.skipUnless(config.extended, "part of extended tests")
|
|
def test_vrrp4_intf_tracking(self):
|
|
"""IPv4 Master VR adjusts priority based on tracked interface"""
|
|
|
|
vr_id = 100
|
|
prio = 255
|
|
intvl = self._default_adv
|
|
intvl_s = intvl * 0.01
|
|
vip = self.pg0.local_ip4
|
|
vr = VppVRRPVirtualRouter(
|
|
self,
|
|
self.pg0,
|
|
vr_id,
|
|
prio=prio,
|
|
intvl=intvl,
|
|
flags=self._default_flags,
|
|
vips=[vip],
|
|
)
|
|
self._vrs.append(vr)
|
|
vr.add_vpp_config()
|
|
|
|
# After adding the VR, it should be in the init state
|
|
vr.assert_state_equals(VRRP_VR_STATE_INIT)
|
|
|
|
# add pg1 as a tracked interface and start the VR
|
|
adjustment = 50
|
|
adjusted_prio = prio - adjustment
|
|
vr.add_del_tracked_interface(
|
|
is_add=1, sw_if_index=self.pg1.sw_if_index, prio=adjustment
|
|
)
|
|
vr.start_stop(is_start=1)
|
|
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
|
|
|
|
adv_configured = vr.vrrp_adv_packet(prio=prio)
|
|
adv_adjusted = vr.vrrp_adv_packet(prio=adjusted_prio)
|
|
|
|
# tracked intf is up -> advertised priority == configured priority
|
|
self.pg0.enable_capture()
|
|
rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
|
|
self.assertEqual(rx, adv_configured)
|
|
|
|
# take down pg1, verify priority is now being adjusted
|
|
self.pg1.admin_down()
|
|
self.pg0.enable_capture()
|
|
rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
|
|
self.assertEqual(rx, adv_adjusted)
|
|
|
|
# bring up pg1, verify priority now matches configured value
|
|
self.pg1.admin_up()
|
|
self.pg0.enable_capture()
|
|
rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
|
|
self.assertEqual(rx, adv_configured)
|
|
|
|
# remove IP address from pg1, verify priority now being adjusted
|
|
self.pg1.unconfig_ip4()
|
|
self.pg0.enable_capture()
|
|
rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
|
|
self.assertEqual(rx, adv_adjusted)
|
|
|
|
# add IP address to pg1, verify priority now matches configured value
|
|
self.pg1.config_ip4()
|
|
self.pg0.enable_capture()
|
|
rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
|
|
self.assertEqual(rx, adv_configured)
|
|
|
|
@unittest.skipUnless(config.extended, "part of extended tests")
|
|
def test_vrrp4_master_adv_unicast(self):
|
|
"""IPv4 Master VR advertises (unicast)"""
|
|
|
|
vr_id = 100
|
|
prio = 255
|
|
intvl = self._default_adv
|
|
intvl_s = intvl * 0.01
|
|
vip = self.pg0.local_ip4
|
|
flags = self._default_flags | VRRP_VR_FLAG_UNICAST
|
|
unicast_peer = self.pg0.remote_hosts[4]
|
|
vr = VppVRRPVirtualRouter(
|
|
self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip]
|
|
)
|
|
self._vrs.append(vr)
|
|
vr.add_vpp_config()
|
|
vr.set_unicast_peers([unicast_peer.ip4])
|
|
|
|
# After adding the VR, it should be in the init state
|
|
vr.assert_state_equals(VRRP_VR_STATE_INIT)
|
|
|
|
# Start VR, transition to master
|
|
vr.start_stop(is_start=1)
|
|
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
|
|
|
|
self.pg0.enable_capture()
|
|
rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
|
|
|
|
self.assertTrue(rx.haslayer(Ether))
|
|
self.assertTrue(rx.haslayer(IP))
|
|
self.assertTrue(rx.haslayer(VRRPv3))
|
|
self.assertEqual(rx[Ether].src, self.pg0.local_mac)
|
|
self.assertEqual(rx[Ether].dst, unicast_peer.mac)
|
|
self.assertEqual(rx[IP].src, self.pg0.local_ip4)
|
|
self.assertEqual(rx[IP].dst, unicast_peer.ip4)
|
|
self.assertEqual(rx[VRRPv3].vrid, vr_id)
|
|
self.assertEqual(rx[VRRPv3].priority, prio)
|
|
self.assertEqual(rx[VRRPv3].ipcount, 1)
|
|
self.assertEqual(rx[VRRPv3].addrlist, [vip])
|
|
|
|
|
|
class TestVRRP6(VppTestCase):
|
|
"""IPv6 VRRP Test Case"""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(TestVRRP6, cls).setUpClass()
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
super(TestVRRP6, cls).tearDownClass()
|
|
|
|
def setUp(self):
|
|
super(TestVRRP6, self).setUp()
|
|
|
|
self.create_pg_interfaces(range(2))
|
|
|
|
for i in self.pg_interfaces:
|
|
i.admin_up()
|
|
i.config_ip6()
|
|
i.generate_remote_hosts(5)
|
|
i.configure_ipv6_neighbors()
|
|
|
|
self._vrs = []
|
|
self._default_flags = VRRP_VR_FLAG_IPV6 | VRRP_VR_FLAG_PREEMPT
|
|
self._default_adv = 100
|
|
|
|
def tearDown(self):
|
|
for vr in self._vrs:
|
|
try:
|
|
vr_api = vr.query_vpp_config()
|
|
if vr_api.runtime.state != VRRP_VR_STATE_INIT:
|
|
vr.start_stop(is_start=0)
|
|
vr.remove_vpp_config()
|
|
except:
|
|
self.logger.error("Error cleaning up")
|
|
|
|
for i in self.pg_interfaces:
|
|
i.admin_down()
|
|
i.unconfig_ip4()
|
|
i.unconfig_ip6()
|
|
|
|
self._vrs = []
|
|
|
|
super(TestVRRP6, self).tearDown()
|
|
|
|
def verify_vrrp6_mlr(self, pkt, vr):
|
|
ip6 = pkt[IPv6]
|
|
self.assertEqual(ip6.dst, "ff02::16")
|
|
self.assertEqual(ipv6nh[ip6.nh], "Hop-by-Hop Option Header")
|
|
|
|
hbh = pkt[IPv6ExtHdrHopByHop]
|
|
self.assertEqual(ipv6nh[hbh.nh], "ICMPv6")
|
|
|
|
self.assertTrue(pkt.haslayer(ICMPv6MLReport2))
|
|
mlr = pkt[ICMPv6MLReport2]
|
|
# should contain mc addr records for:
|
|
# - VRRPv3 multicast addr
|
|
# - solicited node mc addr record for each VR virtual IPv6 address
|
|
vips = vr.virtual_ips()
|
|
self.assertEqual(mlr.records_number, len(vips) + 1)
|
|
self.assertEqual(mlr.records[0].dst, vr.adv_dest_ip())
|
|
|
|
def verify_vrrp6_adv(self, rx_pkt, vr, prio=None):
|
|
self.assertTrue(rx_pkt.haslayer(Ether))
|
|
self.assertTrue(rx_pkt.haslayer(IPv6))
|
|
self.assertTrue(rx_pkt.haslayer(VRRPv3))
|
|
|
|
# generate a packet for this VR and compare it to the one received
|
|
pkt = vr.vrrp_adv_packet(prio=prio)
|
|
self.assertTrue(rx_pkt.haslayer(Ether))
|
|
self.assertTrue(rx_pkt.haslayer(IPv6))
|
|
self.assertTrue(rx_pkt.haslayer(VRRPv3))
|
|
|
|
self.assertEqual(pkt, rx_pkt)
|
|
|
|
def verify_vrrp6_gna(self, pkt, vr):
|
|
self.assertTrue(pkt.haslayer(Ether))
|
|
self.assertTrue(pkt.haslayer(IPv6))
|
|
self.assertTrue(pkt.haslayer(ICMPv6ND_NA))
|
|
self.assertTrue(pkt.haslayer(ICMPv6NDOptDstLLAddr))
|
|
|
|
self.assertEqual(pkt[Ether].dst, "33:33:00:00:00:01")
|
|
|
|
self.assertEqual(pkt[IPv6].dst, "ff02::1")
|
|
# convert addrs to packed format since string versions could differ
|
|
src_addr = inet_pton(socket.AF_INET6, pkt[IPv6].src)
|
|
vr_ll_addr = inet_pton(socket.AF_INET6, vr.interface().local_ip6_ll)
|
|
self.assertEqual(src_addr, vr_ll_addr)
|
|
|
|
self.assertTrue(pkt[ICMPv6ND_NA].tgt in vr.virtual_ips())
|
|
self.assertEqual(pkt[ICMPv6NDOptDstLLAddr].lladdr, vr.virtual_mac())
|
|
|
|
# VR with priority 255 owns the virtual address and should
|
|
# become master and start advertising immediately.
|
|
@unittest.skipUnless(config.extended, "part of extended tests")
|
|
def test_vrrp6_master_adv(self):
|
|
"""IPv6 Master VR advertises"""
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
|
|
prio = 255
|
|
intvl = self._default_adv
|
|
vr = VppVRRPVirtualRouter(
|
|
self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
|
|
)
|
|
self._vrs.append(vr)
|
|
|
|
vr.add_vpp_config()
|
|
self.logger.info(self.vapi.cli("show vrrp vr"))
|
|
vr.start_stop(is_start=1)
|
|
self.logger.info(self.vapi.cli("show vrrp vr"))
|
|
vr.start_stop(is_start=0)
|
|
self.logger.info(self.vapi.cli("show vrrp vr"))
|
|
|
|
pkts = self.pg0.get_capture(4, filter_out_fn=None)
|
|
|
|
# Init -> Master: Multicast group Join, VRRP adv, gratuitous NAs sent
|
|
self.verify_vrrp6_mlr(pkts[0], vr)
|
|
self.verify_vrrp6_adv(pkts[1], vr, prio=prio)
|
|
self.verify_vrrp6_gna(pkts[2], vr)
|
|
# Master -> Init: Adv with priority 0 sent to force an election
|
|
self.verify_vrrp6_adv(pkts[3], vr, prio=0)
|
|
|
|
vr.remove_vpp_config()
|
|
self._vrs = []
|
|
|
|
# Same as above but with the update API, and add a change
|
|
# of parameters to test that too
|
|
@unittest.skipUnless(config.extended, "part of extended tests")
|
|
def test_vrrp6_master_adv_update(self):
|
|
"""IPv6 Master VR adv + Update to Backup"""
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
|
|
prio = 255
|
|
intvl = self._default_adv
|
|
vr = VppVRRPVirtualRouter(
|
|
self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
|
|
)
|
|
|
|
vr.update_vpp_config()
|
|
vr.start_stop(is_start=1)
|
|
self.logger.info(self.vapi.cli("show vrrp vr"))
|
|
# Update VR with lower prio and larger interval
|
|
# we need to keep old VR for the adv checks
|
|
upd_vr = VppVRRPVirtualRouter(
|
|
self,
|
|
self.pg0,
|
|
100,
|
|
prio=100,
|
|
intvl=2 * intvl,
|
|
flags=self._default_flags,
|
|
vips=[self.pg0.remote_ip6],
|
|
)
|
|
upd_vr._vrrp_index = vr._vrrp_index
|
|
upd_vr.update_vpp_config()
|
|
start_time = time.time()
|
|
self.logger.info(self.vapi.cli("show vrrp vr"))
|
|
upd_vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
|
|
self._vrs = [upd_vr]
|
|
|
|
pkts = self.pg0.get_capture(5, filter_out_fn=None)
|
|
|
|
# Init -> Master: Multicast group Join, VRRP adv, gratuitous NAs sent
|
|
self.verify_vrrp6_mlr(pkts[0], vr)
|
|
self.verify_vrrp6_adv(pkts[1], vr, prio=prio)
|
|
self.verify_vrrp6_gna(pkts[2], vr)
|
|
# Master -> Init: Adv with priority 0 sent to force an election
|
|
self.verify_vrrp6_adv(pkts[3], vr, prio=0)
|
|
# Init -> Backup: A multicast listener report should be sent
|
|
# not actually verified in the test below, where I took this from
|
|
|
|
# send higher prio advertisements, should not see VPP send any
|
|
src_ip = self.pg0.remote_ip6_ll
|
|
pkts = [upd_vr.vrrp_adv_packet(prio=110, src_ip=src_ip)]
|
|
self.logger.info(self.vapi.cli("show vlib graph"))
|
|
end_time = start_time + 2 * upd_vr.master_down_seconds()
|
|
while time.time() < end_time:
|
|
self.send_and_assert_no_replies(
|
|
self.pg0, pkts, timeout=0.01 * upd_vr._intvl
|
|
)
|
|
self.logger.info(self.vapi.cli("show trace"))
|
|
|
|
vr.start_stop(is_start=0)
|
|
self.logger.info(self.vapi.cli("show vrrp vr"))
|
|
|
|
# VR with priority < 255 enters backup state and does not advertise as
|
|
# long as it receives higher priority advertisements
|
|
@unittest.skipUnless(config.extended, "part of extended tests")
|
|
def test_vrrp6_backup_noadv(self):
|
|
"""IPv6 Backup VR does not advertise"""
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
|
|
vr_id = 100
|
|
prio = 100
|
|
intvl = self._default_adv
|
|
intvl_s = intvl * 0.01
|
|
vr = VppVRRPVirtualRouter(
|
|
self,
|
|
self.pg0,
|
|
vr_id,
|
|
prio=prio,
|
|
intvl=intvl,
|
|
flags=self._default_flags,
|
|
vips=[self.pg0.remote_ip6],
|
|
)
|
|
vr.add_vpp_config()
|
|
self._vrs.append(vr)
|
|
|
|
vr.start_stop(is_start=1)
|
|
|
|
vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
|
|
# watch for advertisements for 2x the master down preemption timeout
|
|
end_time = vr.start_time() + 2 * vr.master_down_seconds()
|
|
|
|
# Init -> Backup: A multicast listener report should be sent
|
|
pkts = self.pg0.get_capture(1, filter_out_fn=None)
|
|
|
|
# send higher prio advertisements, should not see VPP send any
|
|
src_ip = self.pg0.remote_ip6_ll
|
|
num_advs = 5
|
|
pkts = [vr.vrrp_adv_packet(prio=prio + 10, src_ip=src_ip)]
|
|
self.logger.info(self.vapi.cli("show vlib graph"))
|
|
while time.time() < end_time:
|
|
self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
|
|
self.logger.info(self.vapi.cli("show trace"))
|
|
num_advs -= 1
|
|
|
|
vr.start_stop(is_start=0)
|
|
self.logger.info(self.vapi.cli("show vrrp vr"))
|
|
vr.remove_vpp_config()
|
|
self._vrs = []
|
|
|
|
def test_vrrp6_master_nd(self):
|
|
"""IPv6 Master VR replies to NDP"""
|
|
self.pg_start()
|
|
|
|
# VR virtual IP is the default, which is the pg local IP
|
|
vr_id = 100
|
|
prio = 255
|
|
intvl = self._default_adv
|
|
vr = VppVRRPVirtualRouter(
|
|
self, self.pg0, 100, prio=prio, intvl=intvl, flags=self._default_flags
|
|
)
|
|
vr.add_vpp_config()
|
|
self._vrs.append(vr)
|
|
|
|
# before the VR is up, NDP should resolve to interface MAC
|
|
self.pg0.resolve_ndp()
|
|
self.assertNotEqual(self.pg0.local_mac, vr.virtual_mac())
|
|
|
|
# start the VR, NDP should now resolve to virtual MAC
|
|
vr.start_stop(is_start=1)
|
|
self.pg0.resolve_ndp()
|
|
self.assertEqual(self.pg0.local_mac, vr.virtual_mac())
|
|
|
|
# stop the VR, ARP should resolve to interface MAC again
|
|
vr.start_stop(is_start=0)
|
|
self.pg0.resolve_ndp()
|
|
self.assertNotEqual(self.pg0.local_mac, vr.virtual_mac())
|
|
|
|
vr.remove_vpp_config()
|
|
self._vrs = []
|
|
|
|
@unittest.skipUnless(config.extended, "part of extended tests")
|
|
def test_vrrp6_backup_nond(self):
|
|
"""IPv6 Backup VR ignores NDP"""
|
|
# We need an address for a virtual IP that is not the IP that
|
|
# ARP requests will originate from
|
|
|
|
vr_id = 100
|
|
prio = 100
|
|
intvl = self._default_adv
|
|
intvl_s = intvl * 0.01
|
|
vip = self.pg0.remote_hosts[1].ip6
|
|
vr = VppVRRPVirtualRouter(
|
|
self,
|
|
self.pg0,
|
|
vr_id,
|
|
prio=prio,
|
|
intvl=intvl,
|
|
flags=self._default_flags,
|
|
vips=[vip],
|
|
)
|
|
vr.add_vpp_config()
|
|
self._vrs.append(vr)
|
|
|
|
nsma = in6_getnsma(inet_pton(socket.AF_INET6, vip))
|
|
dmac = in6_getnsmac(nsma)
|
|
dst_ip = inet_ntop(socket.AF_INET6, nsma)
|
|
|
|
ndp_req = (
|
|
Ether(dst=dmac, src=self.pg0.remote_mac)
|
|
/ IPv6(dst=dst_ip, src=self.pg0.remote_ip6)
|
|
/ ICMPv6ND_NS(tgt=vip)
|
|
/ ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac)
|
|
)
|
|
|
|
# Before the VR is started make sure no reply to request for VIP
|
|
self.send_and_assert_no_replies(self.pg0, [ndp_req], timeout=1)
|
|
|
|
# VR should start in backup state and still should not reply to NDP
|
|
# send a higher priority adv to make sure it does not become master
|
|
adv = vr.vrrp_adv_packet(prio=prio + 10, src_ip=self.pg0.remote_ip6)
|
|
pkts = [adv, ndp_req]
|
|
vr.start_stop(is_start=1)
|
|
self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
|
|
|
|
vr.start_stop(is_start=0)
|
|
|
|
@unittest.skipUnless(config.extended, "part of extended tests")
|
|
def test_vrrp6_election(self):
|
|
"""IPv6 Backup VR becomes master if no advertisements received"""
|
|
|
|
vr_id = 100
|
|
prio = 100
|
|
intvl = self._default_adv
|
|
intvl_s = intvl * 0.01
|
|
vip = self.pg0.remote_ip6
|
|
vr = VppVRRPVirtualRouter(
|
|
self,
|
|
self.pg0,
|
|
vr_id,
|
|
prio=prio,
|
|
intvl=intvl,
|
|
flags=self._default_flags,
|
|
vips=[vip],
|
|
)
|
|
self._vrs.append(vr)
|
|
vr.add_vpp_config()
|
|
|
|
# After adding the VR, it should be in the init state
|
|
vr.assert_state_equals(VRRP_VR_STATE_INIT)
|
|
|
|
self.pg_start()
|
|
vr.start_stop(is_start=1)
|
|
|
|
# VR should be in backup state after starting
|
|
vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
|
|
end_time = vr.start_time() + vr.master_down_seconds()
|
|
|
|
# no advertisements should arrive until timer expires
|
|
self.pg0.enable_capture()
|
|
while (time.time() + intvl_s) < end_time:
|
|
time.sleep(intvl_s)
|
|
self.pg0.assert_nothing_captured(filter_out_fn=is_not_adv)
|
|
|
|
# VR should be in master state after timer expires
|
|
self.pg0.enable_capture()
|
|
self.pg0.wait_for_packet(intvl_s, is_not_adv)
|
|
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
|
|
|
|
@unittest.skipUnless(config.extended, "part of extended tests")
|
|
def test_vrrp6_backup_preempts(self):
|
|
"""IPv6 Backup VR preempts lower priority master"""
|
|
|
|
vr_id = 100
|
|
prio = 100
|
|
intvl = self._default_adv
|
|
intvl_s = intvl * 0.01
|
|
vip = self.pg0.remote_ip6
|
|
vr = VppVRRPVirtualRouter(
|
|
self,
|
|
self.pg0,
|
|
vr_id,
|
|
prio=prio,
|
|
intvl=intvl,
|
|
flags=self._default_flags,
|
|
vips=[vip],
|
|
)
|
|
self._vrs.append(vr)
|
|
vr.add_vpp_config()
|
|
|
|
# After adding the VR, it should be in the init state
|
|
vr.assert_state_equals(VRRP_VR_STATE_INIT)
|
|
|
|
self.pg_start()
|
|
vr.start_stop(is_start=1)
|
|
|
|
# VR should be in backup state after starting
|
|
vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
|
|
end_time = vr.start_time() + vr.master_down_seconds()
|
|
|
|
# send lower prio advertisements until timer expires
|
|
src_ip = self.pg0.remote_ip6
|
|
pkts = [vr.vrrp_adv_packet(prio=prio - 10, src_ip=src_ip)]
|
|
while (time.time() + intvl_s) < end_time:
|
|
self.send_and_assert_no_replies(self.pg0, pkts, timeout=intvl_s)
|
|
self.logger.info(self.vapi.cli("show trace"))
|
|
|
|
# when timer expires, VR should take over as master
|
|
self.pg0.enable_capture()
|
|
self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
|
|
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
|
|
|
|
@unittest.skipUnless(config.extended, "part of extended tests")
|
|
def test_vrrp6_master_preempted(self):
|
|
"""IPv6 Master VR preempted by higher priority backup"""
|
|
|
|
# A prio 255 VR cannot be preempted so the prio has to be lower and
|
|
# we have to wait for it to take over
|
|
vr_id = 100
|
|
prio = 100
|
|
intvl = self._default_adv
|
|
vip = self.pg0.remote_ip6
|
|
vr = VppVRRPVirtualRouter(
|
|
self,
|
|
self.pg0,
|
|
vr_id,
|
|
prio=prio,
|
|
intvl=intvl,
|
|
flags=self._default_flags,
|
|
vips=[vip],
|
|
)
|
|
self._vrs.append(vr)
|
|
vr.add_vpp_config()
|
|
|
|
# After adding the VR, it should be in the init state
|
|
vr.assert_state_equals(VRRP_VR_STATE_INIT)
|
|
|
|
# start VR
|
|
vr.start_stop(is_start=1)
|
|
vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
|
|
|
|
# wait for VR to take over as master
|
|
end_time = vr.start_time() + vr.master_down_seconds()
|
|
sleep_s = end_time - time.time()
|
|
time.sleep(sleep_s)
|
|
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
|
|
|
|
# Build advertisement packet and send it
|
|
pkts = [vr.vrrp_adv_packet(prio=255, src_ip=self.pg0.remote_ip6)]
|
|
self.pg_send(self.pg0, pkts)
|
|
|
|
# VR should be in backup state again
|
|
vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
|
|
|
|
@unittest.skipUnless(config.extended, "part of extended tests")
|
|
def test_vrrp6_accept_mode_disabled(self):
|
|
"""IPv6 Master VR does not reply for VIP w/ accept mode off"""
|
|
|
|
# accept mode only matters when prio < 255, so it will have to
|
|
# come up as a backup and take over as master after the timeout
|
|
vr_id = 100
|
|
prio = 100
|
|
intvl = self._default_adv
|
|
vip = self.pg0.remote_hosts[4].ip6
|
|
vr = VppVRRPVirtualRouter(
|
|
self,
|
|
self.pg0,
|
|
vr_id,
|
|
prio=prio,
|
|
intvl=intvl,
|
|
flags=self._default_flags,
|
|
vips=[vip],
|
|
)
|
|
self._vrs.append(vr)
|
|
vr.add_vpp_config()
|
|
|
|
# After adding the VR, it should be in the init state
|
|
vr.assert_state_equals(VRRP_VR_STATE_INIT)
|
|
|
|
# start VR
|
|
vr.start_stop(is_start=1)
|
|
vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
|
|
|
|
# wait for VR to take over as master
|
|
end_time = vr.start_time() + vr.master_down_seconds()
|
|
sleep_s = end_time - time.time()
|
|
time.sleep(sleep_s)
|
|
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
|
|
|
|
# send an ICMPv6 echo to the VR virtual IP address
|
|
echo = (
|
|
Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac)
|
|
/ IPv6(dst=vip, src=self.pg0.remote_ip6)
|
|
/ ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index)
|
|
)
|
|
self.pg_send(self.pg0, [echo])
|
|
|
|
# wait for an echo reply. none should be received
|
|
time.sleep(1)
|
|
self.pg0.assert_nothing_captured(filter_out_fn=is_not_echo_reply)
|
|
|
|
@unittest.skipUnless(config.extended, "part of extended tests")
|
|
def test_vrrp6_accept_mode_enabled(self):
|
|
"""IPv6 Master VR replies for VIP w/ accept mode on"""
|
|
|
|
# A prio 255 VR cannot be preempted so the prio has to be lower and
|
|
# we have to wait for it to take over
|
|
vr_id = 100
|
|
prio = 100
|
|
intvl = self._default_adv
|
|
vip = self.pg0.remote_hosts[4].ip6
|
|
flags = self._default_flags | VRRP_VR_FLAG_ACCEPT
|
|
vr = VppVRRPVirtualRouter(
|
|
self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip]
|
|
)
|
|
self._vrs.append(vr)
|
|
vr.add_vpp_config()
|
|
|
|
# After adding the VR, it should be in the init state
|
|
vr.assert_state_equals(VRRP_VR_STATE_INIT)
|
|
|
|
# start VR
|
|
vr.start_stop(is_start=1)
|
|
vr.assert_state_equals(VRRP_VR_STATE_BACKUP)
|
|
|
|
# wait for VR to take over as master
|
|
end_time = vr.start_time() + vr.master_down_seconds()
|
|
sleep_s = end_time - time.time()
|
|
time.sleep(sleep_s)
|
|
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
|
|
|
|
# send an ICMP echo to the VR virtual IP address
|
|
echo = (
|
|
Ether(dst=vr.virtual_mac(), src=self.pg0.remote_mac)
|
|
/ IPv6(dst=vip, src=self.pg0.remote_ip6)
|
|
/ ICMPv6EchoRequest(seq=1, id=self.pg0.sw_if_index)
|
|
)
|
|
self.pg_send(self.pg0, [echo])
|
|
|
|
# wait for an echo reply.
|
|
time.sleep(1)
|
|
rx_pkts = self.pg0.get_capture(
|
|
expected_count=1, timeout=1, filter_out_fn=is_not_echo_reply
|
|
)
|
|
|
|
self.assertEqual(rx_pkts[0][IPv6].src, vip)
|
|
self.assertEqual(rx_pkts[0][IPv6].dst, self.pg0.remote_ip6)
|
|
self.assertEqual(rx_pkts[0][ICMPv6EchoReply].seq, 1)
|
|
self.assertEqual(rx_pkts[0][ICMPv6EchoReply].id, self.pg0.sw_if_index)
|
|
|
|
@unittest.skipUnless(config.extended, "part of extended tests")
|
|
def test_vrrp6_intf_tracking(self):
|
|
"""IPv6 Master VR adjusts priority based on tracked interface"""
|
|
|
|
vr_id = 100
|
|
prio = 255
|
|
intvl = self._default_adv
|
|
intvl_s = intvl * 0.01
|
|
vip = self.pg0.local_ip6
|
|
vr = VppVRRPVirtualRouter(
|
|
self,
|
|
self.pg0,
|
|
vr_id,
|
|
prio=prio,
|
|
intvl=intvl,
|
|
flags=self._default_flags,
|
|
vips=[vip],
|
|
)
|
|
self._vrs.append(vr)
|
|
vr.add_vpp_config()
|
|
|
|
# After adding the VR, it should be in the init state
|
|
vr.assert_state_equals(VRRP_VR_STATE_INIT)
|
|
|
|
# add pg1 as a tracked interface and start the VR
|
|
adjustment = 50
|
|
adjusted_prio = prio - adjustment
|
|
vr.add_del_tracked_interface(
|
|
is_add=1, sw_if_index=self.pg1.sw_if_index, prio=adjustment
|
|
)
|
|
vr.start_stop(is_start=1)
|
|
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
|
|
|
|
adv_configured = vr.vrrp_adv_packet(prio=prio)
|
|
adv_adjusted = vr.vrrp_adv_packet(prio=adjusted_prio)
|
|
|
|
# tracked intf is up -> advertised priority == configured priority
|
|
self.pg0.enable_capture()
|
|
rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
|
|
self.assertEqual(rx, adv_configured)
|
|
|
|
# take down pg1, verify priority is now being adjusted
|
|
self.pg1.admin_down()
|
|
self.pg0.enable_capture()
|
|
rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
|
|
self.assertEqual(rx, adv_adjusted)
|
|
|
|
# bring up pg1, verify priority now matches configured value
|
|
self.pg1.admin_up()
|
|
self.pg0.enable_capture()
|
|
rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
|
|
self.assertEqual(rx, adv_configured)
|
|
|
|
# remove IP address from pg1, verify priority now being adjusted
|
|
self.pg1.unconfig_ip6()
|
|
self.pg0.enable_capture()
|
|
rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
|
|
self.assertEqual(rx, adv_adjusted)
|
|
|
|
# add IP address to pg1, verify priority now matches configured value
|
|
self.pg1.config_ip6()
|
|
self.pg0.enable_capture()
|
|
rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
|
|
self.assertEqual(rx, adv_configured)
|
|
|
|
@unittest.skipUnless(config.extended, "part of extended tests")
|
|
def test_vrrp6_master_adv_unicast(self):
|
|
"""IPv6 Master VR advertises (unicast)"""
|
|
|
|
vr_id = 100
|
|
prio = 255
|
|
intvl = self._default_adv
|
|
intvl_s = intvl * 0.01
|
|
vip = self.pg0.local_ip6
|
|
flags = self._default_flags | VRRP_VR_FLAG_UNICAST
|
|
unicast_peer = self.pg0.remote_hosts[4]
|
|
vr = VppVRRPVirtualRouter(
|
|
self, self.pg0, vr_id, prio=prio, intvl=intvl, flags=flags, vips=[vip]
|
|
)
|
|
self._vrs.append(vr)
|
|
vr.add_vpp_config()
|
|
vr.set_unicast_peers([unicast_peer.ip6])
|
|
|
|
# After adding the VR, it should be in the init state
|
|
vr.assert_state_equals(VRRP_VR_STATE_INIT)
|
|
|
|
# Start VR, transition to master
|
|
vr.start_stop(is_start=1)
|
|
vr.assert_state_equals(VRRP_VR_STATE_MASTER)
|
|
|
|
self.pg0.enable_capture()
|
|
rx = self.pg0.wait_for_packet(timeout=intvl_s, filter_out_fn=is_not_adv)
|
|
|
|
self.assertTrue(rx.haslayer(Ether))
|
|
self.assertTrue(rx.haslayer(IPv6))
|
|
self.assertTrue(rx.haslayer(VRRPv3))
|
|
self.assertEqual(rx[Ether].src, self.pg0.local_mac)
|
|
self.assertEqual(rx[Ether].dst, unicast_peer.mac)
|
|
self.assertEqual(
|
|
ip6_normalize(rx[IPv6].src), ip6_normalize(self.pg0.local_ip6_ll)
|
|
)
|
|
self.assertEqual(ip6_normalize(rx[IPv6].dst), ip6_normalize(unicast_peer.ip6))
|
|
self.assertEqual(rx[VRRPv3].vrid, vr_id)
|
|
self.assertEqual(rx[VRRPv3].priority, prio)
|
|
self.assertEqual(rx[VRRPv3].ipcount, 1)
|
|
self.assertEqual(rx[VRRPv3].addrlist, [vip])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main(testRunner=VppTestRunner)
|