vpp/test/test_cdp.py

165 lines
4.4 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
""" CDP tests """
from scapy.packet import Packet
from scapy.all import ShortField, StrField
from scapy.layers.l2 import Dot3, LLC, SNAP
from scapy.contrib.cdp import (
CDPMsgDeviceID,
CDPMsgSoftwareVersion,
CDPMsgPlatform,
CDPMsgPortID,
CDPv2_HDR,
)
from framework import VppTestCase
from scapy.all import raw
from re import compile
from time import sleep
from util import ppp
from config import config
import platform
import sys
import unittest
""" TestCDP is a subclass of VPPTestCase classes.
CDP test.
"""
class CustomTLV(Packet):
"""Custom TLV protocol layer for scapy"""
fields_desc = [
ShortField("type", 0),
ShortField("length", 4),
StrField("value", ""),
]
@unittest.skipIf("cdp" in config.excluded_plugins, "Exclude CDP plugin tests")
class TestCDP(VppTestCase):
"""CDP Test Case"""
nen_ptr = compile(r"not enabled")
cdp_ptr = compile(r"^([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)$")
err_ptr = compile(r"^([\d]+)\s+([-\w]+)\s+([ -\.\w)(]+)$")
@property
def device_id(self):
return platform.node()
@property
def version(self):
return platform.release()
@property
def port_id(self):
return self.interface.name
@property
def platform(self):
return platform.system()
@classmethod
def setUpClass(cls):
super(TestCDP, cls).setUpClass()
try:
cls.create_pg_interfaces(range(1))
cls.interface = cls.pg_interfaces[0]
cls.interface.admin_up()
cls.interface.config_ip4()
cls.interface.resolve_arp()
except Exception:
super(TestCDP, cls).tearDownClass()
raise
@classmethod
def tearDownClass(cls):
super(TestCDP, cls).tearDownClass()
def test_enable_cdp(self):
self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1))
ret = self.vapi.cli("show cdp")
self.logger.info(ret)
not_enabled = self.nen_ptr.search(ret)
self.assertFalse(not_enabled, "CDP isn't enabled")
def test_send_cdp_packet(self):
self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1))
self.send_packet(self.create_packet())
neighbors = list(self.show_cdp())
self.assertTrue(neighbors, "CDP didn't register neighbor")
port, system = neighbors[0]
length = min(len(system), len(self.device_id))
self.assert_equal(port, self.port_id, "CDP received invalid port id")
self.assert_equal(
system[:length], self.device_id[:length], "CDP received invalid device id"
)
def test_cdp_underflow_tlv(self):
self.send_bad_packet(3, ".")
def test_cdp_overflow_tlv(self):
self.send_bad_packet(8, ".")
def send_bad_packet(self, l, v):
self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1))
self.send_packet(self.create_bad_packet(l, v))
err = self.statistics.get_err_counter(
"/err/cdp-input/cdp packets with bad TLVs"
)
self.assertTrue(err >= 1, "CDP didn't drop bad packet")
def send_packet(self, packet):
self.logger.debug(ppp("Sending packet:", packet))
self.interface.add_stream(packet)
self.pg_start()
def create_base_packet(self):
packet = (
Dot3(src=self.interface.remote_mac, dst="01:00:0c:cc:cc:cc")
/ LLC(dsap=0xAA, ssap=0xAA, ctrl=0x03)
/ SNAP()
/ CDPv2_HDR()
)
return packet
def create_packet(self):
packet = (
self.create_base_packet()
/ CDPMsgDeviceID(val=self.device_id)
/ CDPMsgSoftwareVersion(val=self.version)
/ CDPMsgPortID(iface=self.port_id)
/ CDPMsgPlatform(val=self.platform)
)
return packet
def create_bad_packet(self, tl=4, tv=""):
packet = self.create_base_packet() / CustomTLV(type=1, length=tl, value=tv)
return packet
def process_cli(self, exp, ptr):
for line in self.vapi.cli(exp).split("\n")[1:]:
m = ptr.match(line.strip())
if m:
yield m.groups()
def show_cdp(self):
for pack in self.process_cli("show cdp", self.cdp_ptr):
try:
port, system, _, _ = pack
except ValueError:
pass
else:
yield port, system