81119e86bd
Change-Id: I72a1ccdfdd5573335ef78fc01d5268934c73bd31 Signed-off-by: Juraj Sloboda <jsloboda@cisco.com>
450 lines
17 KiB
Python
450 lines
17 KiB
Python
from scapy.layers.dhcp6 import DHCP6_Advertise, DHCP6OptClientId, \
|
|
DHCP6OptStatusCode, DHCP6OptPref, DHCP6OptIA_PD, DHCP6OptIAPrefix, \
|
|
DHCP6OptServerId, DHCP6_Solicit, DHCP6_Reply, DHCP6_Request, DHCP6_Renew, \
|
|
DHCP6_Rebind, DUID_LLT, DHCP6_Release, DHCP6OptElapsedTime
|
|
from scapy.layers.inet6 import IPv6, Ether, UDP
|
|
from scapy.utils6 import in6_mactoifaceid
|
|
from scapy.utils import inet_ntop, inet_pton
|
|
from socket import AF_INET6
|
|
from framework import VppTestCase
|
|
from time import time
|
|
|
|
|
|
def ip6_normalize(ip6):
|
|
return inet_ntop(AF_INET6, inet_pton(AF_INET6, ip6))
|
|
|
|
|
|
def mk_ll_addr(mac):
|
|
euid = in6_mactoifaceid(mac)
|
|
addr = "fe80::" + euid
|
|
return addr
|
|
|
|
|
|
class TestDHCPv6PD(VppTestCase):
|
|
""" DHCPv6 PD Data Plane Test Case """
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(TestDHCPv6PD, cls).setUpClass()
|
|
|
|
def setUp(self):
|
|
super(TestDHCPv6PD, self).setUp()
|
|
|
|
self.create_pg_interfaces(range(1))
|
|
self.interfaces = list(self.pg_interfaces)
|
|
for i in self.interfaces:
|
|
i.admin_up()
|
|
i.config_ip6()
|
|
|
|
time_since_2000 = int(time()) - 946684800
|
|
self.server_duid = DUID_LLT(timeval=time_since_2000,
|
|
lladdr=self.pg0.remote_mac)
|
|
|
|
def tearDown(self):
|
|
for i in self.interfaces:
|
|
i.unconfig_ip6()
|
|
i.admin_down()
|
|
super(TestDHCPv6PD, self).tearDown()
|
|
|
|
def test_dhcp_send_solicit_receive_advertise(self):
|
|
""" Verify DHCPv6 PD Solicit packet and received Advertise envent """
|
|
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
prefix_bin = '\00\01\00\02\00\03' + '\00' * 10
|
|
prefix = {'prefix': prefix_bin,
|
|
'prefix_length': 50,
|
|
'preferred_time': 60,
|
|
'valid_time': 120}
|
|
self.vapi.dhcp6_pd_send_client_message(1, self.pg0.sw_if_index,
|
|
T1=20, T2=40, prefixes=[prefix])
|
|
rx_list = self.pg0.get_capture(1)
|
|
self.assertEqual(len(rx_list), 1)
|
|
packet = rx_list[0]
|
|
|
|
self.assertTrue(packet.haslayer(IPv6))
|
|
self.assertTrue(packet[IPv6].haslayer(DHCP6_Solicit))
|
|
|
|
client_duid = packet[DHCP6OptClientId].duid
|
|
trid = packet[DHCP6_Solicit].trid
|
|
|
|
dst = ip6_normalize(packet[IPv6].dst)
|
|
dst2 = ip6_normalize("ff02::1:2")
|
|
self.assert_equal(dst, dst2)
|
|
src = ip6_normalize(packet[IPv6].src)
|
|
src2 = ip6_normalize(self.pg0.local_ip6_ll)
|
|
self.assert_equal(src, src2)
|
|
ia_pd = packet[DHCP6OptIA_PD]
|
|
self.assert_equal(ia_pd.T1, 20)
|
|
self.assert_equal(ia_pd.T2, 40)
|
|
self.assert_equal(len(ia_pd.iapdopt), 1)
|
|
prefix = ia_pd.iapdopt[0]
|
|
self.assert_equal(prefix.prefix, '1:2:3::')
|
|
self.assert_equal(prefix.plen, 50)
|
|
self.assert_equal(prefix.preflft, 60)
|
|
self.assert_equal(prefix.validlft, 120)
|
|
|
|
self.vapi.want_dhcp6_pd_reply_events()
|
|
self.vapi.dhcp6_clients_enable_disable()
|
|
|
|
ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=60,
|
|
validlft=120)
|
|
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
|
|
IPv6(src=mk_ll_addr(self.pg0.remote_mac),
|
|
dst=self.pg0.local_ip6_ll) /
|
|
UDP(sport=547, dport=546) /
|
|
DHCP6_Advertise(trid=trid) /
|
|
DHCP6OptServerId(duid=self.server_duid) /
|
|
DHCP6OptClientId(duid=client_duid) /
|
|
DHCP6OptPref(prefval=7) /
|
|
DHCP6OptStatusCode(statuscode=1) /
|
|
DHCP6OptIA_PD(iaid=1, T1=20, T2=40, iapdopt=ia_pd_opts)
|
|
)
|
|
self.pg0.add_stream([p])
|
|
self.pg_start()
|
|
|
|
ev = self.vapi.wait_for_event(10, "dhcp6_pd_reply_event")
|
|
|
|
self.assert_equal(ev.preference, 7)
|
|
self.assert_equal(ev.status_code, 1)
|
|
self.assert_equal(ev.T1, 20)
|
|
self.assert_equal(ev.T2, 40)
|
|
|
|
reported_prefix = ev.prefixes[0]
|
|
prefix = inet_pton(AF_INET6, ia_pd_opts.getfieldval("prefix"))
|
|
self.assert_equal(reported_prefix.prefix, prefix)
|
|
self.assert_equal(reported_prefix.prefix_length,
|
|
ia_pd_opts.getfieldval("plen"))
|
|
self.assert_equal(reported_prefix.preferred_time,
|
|
ia_pd_opts.getfieldval("preflft"))
|
|
self.assert_equal(reported_prefix.valid_time,
|
|
ia_pd_opts.getfieldval("validlft"))
|
|
|
|
|
|
class TestDHCPv6PDControlPlane(VppTestCase):
|
|
""" DHCPv6 PD Control Plane Test Case """
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(TestDHCPv6PDControlPlane, cls).setUpClass()
|
|
|
|
def setUp(self):
|
|
super(TestDHCPv6PDControlPlane, self).setUp()
|
|
|
|
self.create_pg_interfaces(range(2))
|
|
self.interfaces = list(self.pg_interfaces)
|
|
for i in self.interfaces:
|
|
i.admin_up()
|
|
|
|
time_since_2000 = int(time()) - 946684800
|
|
self.server_duid = DUID_LLT(timeval=time_since_2000,
|
|
lladdr=self.pg0.remote_mac)
|
|
self.client_duid = None
|
|
self.T1 = 1
|
|
self.T2 = 2
|
|
|
|
fib = self.vapi.ip6_fib_dump()
|
|
self.initial_addresses = set(self.get_interface_addresses(fib,
|
|
self.pg1))
|
|
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
|
|
self.prefix_group = 'my-pd-prefix-group'
|
|
|
|
self.vapi.dhcp6_pd_client_enable_disable(
|
|
self.pg0.sw_if_index,
|
|
prefix_group=self.prefix_group)
|
|
|
|
def tearDown(self):
|
|
self.vapi.dhcp6_pd_client_enable_disable(self.pg0.sw_if_index,
|
|
enable=0)
|
|
|
|
for i in self.interfaces:
|
|
i.admin_down()
|
|
|
|
super(TestDHCPv6PDControlPlane, self).tearDown()
|
|
|
|
@staticmethod
|
|
def get_interface_addresses(fib, pg):
|
|
lst = []
|
|
for entry in fib:
|
|
if entry.address_length == 128:
|
|
path = entry.path[0]
|
|
if path.sw_if_index == pg.sw_if_index:
|
|
lst.append(entry.address)
|
|
return lst
|
|
|
|
def get_addresses(self):
|
|
fib = self.vapi.ip6_fib_dump()
|
|
addresses = set(self.get_interface_addresses(fib, self.pg1))
|
|
return addresses.difference(self.initial_addresses)
|
|
|
|
def validate_duid_llt(self, duid):
|
|
DUID_LLT(duid)
|
|
|
|
def validate_packet(self, packet, msg_type, is_resend=False):
|
|
try:
|
|
self.assertTrue(packet.haslayer(msg_type))
|
|
client_duid = packet[DHCP6OptClientId].duid
|
|
if self.client_duid is None:
|
|
self.client_duid = client_duid
|
|
self.validate_duid_llt(client_duid)
|
|
else:
|
|
self.assertEqual(self.client_duid, client_duid)
|
|
if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
|
|
server_duid = packet[DHCP6OptServerId].duid
|
|
self.assertEqual(server_duid, self.server_duid)
|
|
if is_resend:
|
|
self.assertEqual(self.trid, packet[msg_type].trid)
|
|
else:
|
|
self.trid = packet[msg_type].trid
|
|
ip = packet[IPv6]
|
|
udp = packet[UDP]
|
|
self.assertEqual(ip.dst, 'ff02::1:2')
|
|
self.assertEqual(udp.sport, 546)
|
|
self.assertEqual(udp.dport, 547)
|
|
dhcpv6 = packet[msg_type]
|
|
elapsed_time = dhcpv6[DHCP6OptElapsedTime]
|
|
if (is_resend):
|
|
self.assertNotEqual(elapsed_time.elapsedtime, 0)
|
|
else:
|
|
self.assertEqual(elapsed_time.elapsedtime, 0)
|
|
except:
|
|
packet.show()
|
|
raise
|
|
|
|
def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
|
|
if timeout is None:
|
|
timeout = 3
|
|
rx_list = self.pg0.get_capture(1, timeout=timeout)
|
|
packet = rx_list[0]
|
|
self.validate_packet(packet, msg_type, is_resend=is_resend)
|
|
|
|
def wait_for_solicit(self, timeout=None, is_resend=False):
|
|
self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
|
|
|
|
def wait_for_request(self, timeout=None, is_resend=False):
|
|
self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
|
|
|
|
def wait_for_renew(self, timeout=None, is_resend=False):
|
|
self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
|
|
|
|
def wait_for_rebind(self, timeout=None, is_resend=False):
|
|
self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
|
|
|
|
def wait_for_release(self, timeout=None, is_resend=False):
|
|
self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
|
|
|
|
def send_packet(self, msg_type, t1=None, t2=None, iapdopt=None):
|
|
if t1 is None:
|
|
t1 = self.T1
|
|
if t2 is None:
|
|
t2 = self.T2
|
|
if iapdopt is None:
|
|
opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2)
|
|
else:
|
|
opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2, iapdopt=iapdopt)
|
|
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
|
|
IPv6(src=mk_ll_addr(self.pg0.remote_mac),
|
|
dst=self.pg0.local_ip6_ll) /
|
|
UDP(sport=547, dport=546) /
|
|
msg_type(trid=self.trid) /
|
|
DHCP6OptServerId(duid=self.server_duid) /
|
|
DHCP6OptClientId(duid=self.client_duid) /
|
|
opt_ia_pd
|
|
)
|
|
self.pg0.add_stream([p])
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.pg_start()
|
|
|
|
def send_advertise(self, t1=None, t2=None, iapdopt=None):
|
|
self.send_packet(DHCP6_Advertise, t1, t2, iapdopt)
|
|
|
|
def send_reply(self, t1=None, t2=None, iapdopt=None):
|
|
self.send_packet(DHCP6_Reply, t1, t2, iapdopt)
|
|
|
|
def test_T1_and_T2_timeouts(self):
|
|
""" Test T1 and T2 timeouts """
|
|
|
|
self.wait_for_solicit()
|
|
self.send_advertise()
|
|
self.wait_for_request()
|
|
self.send_reply()
|
|
|
|
self.sleep(1)
|
|
|
|
self.wait_for_renew()
|
|
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
|
|
self.sleep(1)
|
|
|
|
self.wait_for_rebind()
|
|
|
|
def test_prefixes(self):
|
|
""" Test handling of prefixes """
|
|
address_bin_1 = None
|
|
address_bin_2 = None
|
|
try:
|
|
address_bin_1 = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
|
|
address_prefix_length_1 = 60
|
|
self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
|
|
address_bin_1,
|
|
address_prefix_length_1,
|
|
self.prefix_group)
|
|
|
|
ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=2,
|
|
validlft=3)
|
|
|
|
self.wait_for_solicit()
|
|
self.send_advertise(t1=20, t2=40, iapdopt=ia_pd_opts)
|
|
self.wait_for_request()
|
|
self.send_reply(t1=20, t2=40, iapdopt=ia_pd_opts)
|
|
self.sleep(0.1)
|
|
|
|
# check FIB for new address
|
|
new_addresses = self.get_addresses()
|
|
self.assertEqual(len(new_addresses), 1)
|
|
addr = list(new_addresses)[0]
|
|
self.assertEqual(inet_ntop(AF_INET6, addr), '7:8:0:2::405')
|
|
|
|
self.sleep(1)
|
|
|
|
address_bin_2 = '\x00' * 6 + '\x00\x76' + '\x00' * 6 + '\x04\x06'
|
|
address_prefix_length_2 = 62
|
|
self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
|
|
address_bin_2,
|
|
address_prefix_length_2,
|
|
self.prefix_group)
|
|
|
|
self.sleep(1)
|
|
|
|
# check FIB contains 2 addresses
|
|
fib = self.vapi.ip6_fib_dump()
|
|
addresses = set(self.get_interface_addresses(fib, self.pg1))
|
|
new_addresses = addresses.difference(self.initial_addresses)
|
|
self.assertEqual(len(new_addresses), 2)
|
|
addr1 = list(new_addresses)[0]
|
|
addr2 = list(new_addresses)[1]
|
|
if inet_ntop(AF_INET6, addr1) == '7:8:0:76::406':
|
|
addr1, addr2 = addr2, addr1
|
|
self.assertEqual(inet_ntop(AF_INET6, addr1), '7:8:0:2::405')
|
|
self.assertEqual(inet_ntop(AF_INET6, addr2), '7:8:0:76::406')
|
|
|
|
self.sleep(1)
|
|
|
|
# check that the addresses are deleted
|
|
fib = self.vapi.ip6_fib_dump()
|
|
addresses = set(self.get_interface_addresses(fib, self.pg1))
|
|
new_addresses = addresses.difference(self.initial_addresses)
|
|
self.assertEqual(len(new_addresses), 0)
|
|
|
|
finally:
|
|
if address_bin_1 is not None:
|
|
self.vapi.ip6_add_del_address_using_prefix(
|
|
self.pg1.sw_if_index, address_bin_1,
|
|
address_prefix_length_1, self.prefix_group, is_add=0)
|
|
if address_bin_2 is not None:
|
|
self.vapi.ip6_add_del_address_using_prefix(
|
|
self.pg1.sw_if_index, address_bin_2,
|
|
address_prefix_length_2, self.prefix_group, is_add=0)
|
|
|
|
def test_sending_client_messages_solicit(self):
|
|
""" VPP receives messages from DHCPv6 client """
|
|
|
|
self.wait_for_solicit()
|
|
self.send_packet(DHCP6_Solicit)
|
|
self.send_packet(DHCP6_Request)
|
|
self.send_packet(DHCP6_Renew)
|
|
self.send_packet(DHCP6_Rebind)
|
|
self.sleep(1)
|
|
self.wait_for_solicit(is_resend=True)
|
|
|
|
def test_sending_inapropriate_packets(self):
|
|
""" Server sends messages with inapropriate message types """
|
|
|
|
self.wait_for_solicit()
|
|
self.send_reply()
|
|
self.wait_for_solicit(is_resend=True)
|
|
self.send_advertise()
|
|
self.wait_for_request()
|
|
self.send_advertise()
|
|
self.wait_for_request(is_resend=True)
|
|
self.send_reply()
|
|
self.wait_for_renew()
|
|
|
|
def test_no_prefix_available_in_advertise(self):
|
|
""" Advertise message contains NoPrefixAvail status code """
|
|
|
|
self.wait_for_solicit()
|
|
noavail = DHCP6OptStatusCode(statuscode=6) # NoPrefixAvail
|
|
self.send_advertise(iapdopt=noavail)
|
|
self.wait_for_solicit(is_resend=True)
|
|
|
|
def test_preferred_greater_than_valit_lifetime(self):
|
|
""" Preferred lifetime is greater than valid lifetime """
|
|
|
|
try:
|
|
address_bin = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
|
|
address_prefix_length = 60
|
|
self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
|
|
address_bin,
|
|
address_prefix_length,
|
|
self.prefix_group)
|
|
|
|
self.wait_for_solicit()
|
|
self.send_advertise()
|
|
self.wait_for_request()
|
|
ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
|
|
validlft=3)
|
|
self.send_reply(iapdopt=ia_pd_opts)
|
|
|
|
self.sleep(0.5)
|
|
|
|
# check FIB contains no addresses
|
|
fib = self.vapi.ip6_fib_dump()
|
|
addresses = set(self.get_interface_addresses(fib, self.pg1))
|
|
new_addresses = addresses.difference(self.initial_addresses)
|
|
self.assertEqual(len(new_addresses), 0)
|
|
|
|
finally:
|
|
self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
|
|
address_bin,
|
|
address_prefix_length,
|
|
self.prefix_group,
|
|
is_add=0)
|
|
|
|
def test_T1_greater_than_T2(self):
|
|
""" T1 is greater than T2 """
|
|
|
|
try:
|
|
address_bin = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
|
|
address_prefix_length = 60
|
|
self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
|
|
address_bin,
|
|
address_prefix_length,
|
|
self.prefix_group)
|
|
|
|
self.wait_for_solicit()
|
|
self.send_advertise()
|
|
self.wait_for_request()
|
|
ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
|
|
validlft=8)
|
|
self.send_reply(t1=80, t2=40, iapdopt=ia_pd_opts)
|
|
|
|
self.sleep(0.5)
|
|
|
|
# check FIB contains no addresses
|
|
fib = self.vapi.ip6_fib_dump()
|
|
addresses = set(self.get_interface_addresses(fib, self.pg1))
|
|
new_addresses = addresses.difference(self.initial_addresses)
|
|
self.assertEqual(len(new_addresses), 0)
|
|
|
|
finally:
|
|
self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
|
|
address_bin,
|
|
address_prefix_length,
|
|
self.prefix_group,
|
|
is_add=0)
|