vpp/test/test_cdp.py
Klement Sekera d9b0c6fbf7 tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is
much faster, stable PEP8 compliant code style checker offering also
automatic formatting. It aims to be very stable and produce smallest
diffs. It's used by many small and big projects.

Running checkstyle with black takes a few seconds with a terse output.
Thus, test-checkstyle-diff is no longer necessary.

Expand scope of checkstyle to all python files in the repo, replacing
test-checkstyle with checkstyle-python.

Also, fixstyle-python is now available for automatic style formatting.

Note: python virtualenv has been consolidated in test/Makefile,
test/requirements*.txt which will eventually be moved to a central
location.  This is required to simply the automated generation of
docker executor images in the CI.

Type: improvement
Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8
Signed-off-by: Klement Sekera <klement.sekera@gmail.com>
Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
2022-05-10 18:52:08 +00:00

163 lines
4.3 KiB
Python

#!/usr/bin/env python3
""" CDP tests """
from scapy.packet import Packet
from scapy.all import ShortField, StrField
from scapy.layers.l2 import Dot3, LLC, SNAP
from scapy.contrib.cdp import (
CDPMsgDeviceID,
CDPMsgSoftwareVersion,
CDPMsgPlatform,
CDPMsgPortID,
CDPv2_HDR,
)
from framework import VppTestCase
from scapy.all import raw
from re import compile
from time import sleep
from util import ppp
import platform
import sys
import unittest
""" TestCDP is a subclass of VPPTestCase classes.
CDP test.
"""
class CustomTLV(Packet):
"""Custom TLV protocol layer for scapy"""
fields_desc = [
ShortField("type", 0),
ShortField("length", 4),
StrField("value", ""),
]
class TestCDP(VppTestCase):
"""CDP Test Case"""
nen_ptr = compile(r"not enabled")
cdp_ptr = compile(r"^([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)$")
err_ptr = compile(r"^([\d]+)\s+([-\w]+)\s+([ -\.\w)(]+)$")
@property
def device_id(self):
return platform.node()
@property
def version(self):
return platform.release()
@property
def port_id(self):
return self.interface.name
@property
def platform(self):
return platform.system()
@classmethod
def setUpClass(cls):
super(TestCDP, cls).setUpClass()
try:
cls.create_pg_interfaces(range(1))
cls.interface = cls.pg_interfaces[0]
cls.interface.admin_up()
cls.interface.config_ip4()
cls.interface.resolve_arp()
except Exception:
super(TestCDP, cls).tearDownClass()
raise
@classmethod
def tearDownClass(cls):
super(TestCDP, cls).tearDownClass()
def test_enable_cdp(self):
self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1))
ret = self.vapi.cli("show cdp")
self.logger.info(ret)
not_enabled = self.nen_ptr.search(ret)
self.assertFalse(not_enabled, "CDP isn't enabled")
def test_send_cdp_packet(self):
self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1))
self.send_packet(self.create_packet())
neighbors = list(self.show_cdp())
self.assertTrue(neighbors, "CDP didn't register neighbor")
port, system = neighbors[0]
length = min(len(system), len(self.device_id))
self.assert_equal(port, self.port_id, "CDP received invalid port id")
self.assert_equal(
system[:length], self.device_id[:length], "CDP received invalid device id"
)
def test_cdp_underflow_tlv(self):
self.send_bad_packet(3, ".")
def test_cdp_overflow_tlv(self):
self.send_bad_packet(8, ".")
def send_bad_packet(self, l, v):
self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1))
self.send_packet(self.create_bad_packet(l, v))
err = self.statistics.get_err_counter(
"/err/cdp-input/cdp packets with bad TLVs"
)
self.assertTrue(err >= 1, "CDP didn't drop bad packet")
def send_packet(self, packet):
self.logger.debug(ppp("Sending packet:", packet))
self.interface.add_stream(packet)
self.pg_start()
def create_base_packet(self):
packet = (
Dot3(src=self.interface.remote_mac, dst="01:00:0c:cc:cc:cc")
/ LLC(dsap=0xAA, ssap=0xAA, ctrl=0x03)
/ SNAP()
/ CDPv2_HDR()
)
return packet
def create_packet(self):
packet = (
self.create_base_packet()
/ CDPMsgDeviceID(val=self.device_id)
/ CDPMsgSoftwareVersion(val=self.version)
/ CDPMsgPortID(iface=self.port_id)
/ CDPMsgPlatform(val=self.platform)
)
return packet
def create_bad_packet(self, tl=4, tv=""):
packet = self.create_base_packet() / CustomTLV(type=1, length=tl, value=tv)
return packet
def process_cli(self, exp, ptr):
for line in self.vapi.cli(exp).split("\n")[1:]:
m = ptr.match(line.strip())
if m:
yield m.groups()
def show_cdp(self):
for pack in self.process_cli("show cdp", self.cdp_ptr):
try:
port, system, _, _ = pack
except ValueError:
pass
else:
yield port, system