refactor test framework

Change-Id: I31da3b1857b6399f9899276a2d99cdd19436296c
Signed-off-by: Klement Sekera <ksekera@cisco.com>
Signed-off-by: Matej Klotton <mklotton@cisco.com>
Signed-off-by: Jan Gelety <jgelety@cisco.com>
Signed-off-by: Juraj Sloboda <jsloboda@cisco.com>
This commit is contained in:
Klement Sekera
2016-10-11 11:47:09 +02:00
committed by Matej Klotton
parent f530a5526a
commit f62ae1288a
20 changed files with 2626 additions and 1835 deletions
+20 -21
View File
@@ -188,37 +188,36 @@ plugins-release: $(BR)/.bootstrap.ok
build-vpp-api: $(BR)/.bootstrap.ok
$(call make,$(PLATFORM)_debug,vpp-api-install)
PYTHON_PATH=$(BR)/python
define test
@make -C test \
VPP_TEST_BIN=$(BR)/install-$(1)-native/vpp/bin/vpp \
VPP_TEST_API_TEST_BIN=$(BR)/install-$(1)-native/vpp-api-test/bin/vpp_api_test \
VPP_TEST_PLUGIN_PATH=$(BR)/install-$(1)-native/plugins/lib64/vpp_plugins \
V=$(V) TEST=$(TEST)
$(if $(filter-out $(3),retest),make -C $(BR) PLATFORM=$(1) TAG=$(2) vpp-api-install plugins-install vpp-install vpp-api-test-install,)
make -C test \
VPP_TEST_BIN=$(BR)/install-$(2)-native/vpp/bin/vpp \
VPP_TEST_API_TEST_BIN=$(BR)/install-$(2)-native/vpp-api-test/bin/vpp_api_test \
VPP_TEST_PLUGIN_PATH=$(BR)/install-$(2)-native/plugins/lib64/vpp_plugins \
LD_LIBRARY_PATH=$(BR)/install-$(2)-native/vpp-api/lib64/ \
WS_ROOT=$(WS_ROOT) I=$(I) V=$(V) TEST=$(TEST) PYTHON_PATH=$(PYTHON_PATH) $(3)
endef
test: bootstrap
ifeq ($(OS_ID),ubuntu)
@if ! (dpkg -l python-dev python-scapy &> /dev/null); then \
sudo -E apt-get $(CONFIRM) $(FORCE) install python-dev python-scapy; \
fi
endif
@make -C $(BR) PLATFORM=vpp_lite TAG=vpp_lite vpp-api-install plugins-install vpp-install vpp-api-test-install
$(call test,vpp_lite)
$(call test,vpp_lite,vpp_lite,test)
test-debug: bootstrap
ifeq ($(OS_ID),ubuntu)
@if ! (dpkg -l python-dev python-scapy &> /dev/null); then \
sudo -E apt-get $(CONFIRM) $(FORCE) install python-dev python-scapy; \
fi
endif
@make -C $(BR) PLATFORM=vpp_lite TAG=vpp_lite_debug vpp-api-install plugins-install vpp-install vpp-api-test-install
$(call test,vpp_lite_debug)
$(call test,vpp_lite,vpp_lite_debug,test)
test-doc:
make -C $(BR) PLATFORM=vpp_lite TAG=vpp_lite vpp-api-install plugins-install vpp-install vpp-api-test-install
make -C test PYTHON_PATH=$(PYTHON_PATH) LD_LIBRARY_PATH=$(BR)/install-vpp_lite-native/vpp-api/lib64/ doc
test-clean:
make -C test clean
retest:
$(call test,vpp_lite)
$(call test,vpp_lite,vpp_lite,retest)
retest-debug:
$(call test,vpp_lite_debug)
$(call test,vpp_lite,vpp_lite_debug,retest)
STARTUP_DIR ?= $(PWD)
ifeq ("$(wildcard $(STARTUP_CONF))","")
+1
View File
@@ -1164,6 +1164,7 @@ distclean:
rm -rf $(MU_BUILD_ROOT_DIR)/*.deb
rm -rf $(MU_BUILD_ROOT_DIR)/*.rpm
rm -rf $(MU_BUILD_ROOT_DIR)/*.changes
rm -rf $(MU_BUILD_ROOT_DIR)/python
if [ -e /usr/bin/dh ];then (cd $(MU_BUILD_ROOT_DIR)/deb/;debian/rules clean); fi
rm -f $(MU_BUILD_ROOT_DIR)/deb/debian/*.install
rm -f $(MU_BUILD_ROOT_DIR)/deb/debian/*.dkms
+21 -2
View File
@@ -1,3 +1,22 @@
PYTHON_VENV_PATH=$(PYTHON_PATH)/virtualenv
all:
@python run_tests.py discover -p test_$(TEST)"*.py"
test: clean
@virtualenv $(PYTHON_VENV_PATH)
@bash -c "source $(PYTHON_VENV_PATH)/bin/activate && pip install scapy"
@bash -c "source $(PYTHON_VENV_PATH)/bin/activate && pip install pexpect"
@bash -c "source $(PYTHON_VENV_PATH)/bin/activate && cd $(WS_ROOT)/vpp-api/python && python setup.py install"
@bash -c "source $(PYTHON_VENV_PATH)/bin/activate && python run_tests.py discover -p test_$(TEST)\"*.py\""
retest: clean
@bash -c "source $(PYTHON_VENV_PATH)/bin/activate && python run_tests.py discover -p test_$(TEST)\"*.py\""
.PHONY: clean doc
clean:
@rm -f /dev/shm/vpp-unittest-*
@rm -rf /tmp/vpp-unittest-*
doc:
@virtualenv $(PYTHON_VENV_PATH)
@bash -c "source $(PYTHON_VENV_PATH)/bin/activate && pip install sphinx"
@bash -c "source $(PYTHON_VENV_PATH)/bin/activate && make -C doc html"
+229
View File
@@ -0,0 +1,229 @@
# Makefile for Sphinx documentation
#
# You can set these variables from the command line.
SPHINXOPTS =
SPHINXBUILD = sphinx-build
PAPER =
BUILDDIR = _build
# Internal variables.
PAPEROPT_a4 = -D latex_paper_size=a4
PAPEROPT_letter = -D latex_paper_size=letter
ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) .
# the i18n builder cannot share the environment and doctrees with the others
I18NSPHINXOPTS = $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) .
.PHONY: regen-api-doc
regen-api-doc:
sphinx-apidoc -o . ..
.PHONY: help
help:
@echo "Please use \`make <target>' where <target> is one of"
@echo " html to make standalone HTML files"
@echo " dirhtml to make HTML files named index.html in directories"
@echo " singlehtml to make a single large HTML file"
@echo " pickle to make pickle files"
@echo " json to make JSON files"
@echo " htmlhelp to make HTML files and a HTML help project"
@echo " qthelp to make HTML files and a qthelp project"
@echo " applehelp to make an Apple Help Book"
@echo " devhelp to make HTML files and a Devhelp project"
@echo " epub to make an epub"
@echo " epub3 to make an epub3"
@echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter"
@echo " latexpdf to make LaTeX files and run them through pdflatex"
@echo " latexpdfja to make LaTeX files and run them through platex/dvipdfmx"
@echo " text to make text files"
@echo " man to make manual pages"
@echo " texinfo to make Texinfo files"
@echo " info to make Texinfo files and run them through makeinfo"
@echo " gettext to make PO message catalogs"
@echo " changes to make an overview of all changed/added/deprecated items"
@echo " xml to make Docutils-native XML files"
@echo " pseudoxml to make pseudoxml-XML files for display purposes"
@echo " linkcheck to check all external links for integrity"
@echo " doctest to run all doctests embedded in the documentation (if enabled)"
@echo " coverage to run coverage check of the documentation (if enabled)"
@echo " dummy to check syntax errors of document sources"
.PHONY: clean
clean:
rm -rf $(BUILDDIR)/*
.PHONY: html
html: regen-api-doc
$(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html
@echo
@echo "Build finished. The HTML pages are in $(BUILDDIR)/html."
.PHONY: dirhtml
dirhtml: regen-api-doc
$(SPHINXBUILD) -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml
@echo
@echo "Build finished. The HTML pages are in $(BUILDDIR)/dirhtml."
.PHONY: singlehtml
singlehtml: regen-api-doc
$(SPHINXBUILD) -b singlehtml $(ALLSPHINXOPTS) $(BUILDDIR)/singlehtml
@echo
@echo "Build finished. The HTML page is in $(BUILDDIR)/singlehtml."
.PHONY: pickle
pickle: regen-api-doc
$(SPHINXBUILD) -b pickle $(ALLSPHINXOPTS) $(BUILDDIR)/pickle
@echo
@echo "Build finished; now you can process the pickle files."
.PHONY: json
json: regen-api-doc
$(SPHINXBUILD) -b json $(ALLSPHINXOPTS) $(BUILDDIR)/json
@echo
@echo "Build finished; now you can process the JSON files."
.PHONY: htmlhelp
htmlhelp: regen-api-doc
$(SPHINXBUILD) -b htmlhelp $(ALLSPHINXOPTS) $(BUILDDIR)/htmlhelp
@echo
@echo "Build finished; now you can run HTML Help Workshop with the" \
".hhp project file in $(BUILDDIR)/htmlhelp."
.PHONY: qthelp
qthelp: regen-api-doc
$(SPHINXBUILD) -b qthelp $(ALLSPHINXOPTS) $(BUILDDIR)/qthelp
@echo
@echo "Build finished; now you can run "qcollectiongenerator" with the" \
".qhcp project file in $(BUILDDIR)/qthelp, like this:"
@echo "# qcollectiongenerator $(BUILDDIR)/qthelp/VPPtestframework.qhcp"
@echo "To view the help file:"
@echo "# assistant -collectionFile $(BUILDDIR)/qthelp/VPPtestframework.qhc"
.PHONY: applehelp
applehelp: regen-api-doc
$(SPHINXBUILD) -b applehelp $(ALLSPHINXOPTS) $(BUILDDIR)/applehelp
@echo
@echo "Build finished. The help book is in $(BUILDDIR)/applehelp."
@echo "N.B. You won't be able to view it unless you put it in" \
"~/Library/Documentation/Help or install it in your application" \
"bundle."
.PHONY: devhelp
devhelp: regen-api-doc
$(SPHINXBUILD) -b devhelp $(ALLSPHINXOPTS) $(BUILDDIR)/devhelp
@echo
@echo "Build finished."
@echo "To view the help file:"
@echo "# mkdir -p $$HOME/.local/share/devhelp/VPPtestframework"
@echo "# ln -s $(BUILDDIR)/devhelp $$HOME/.local/share/devhelp/VPPtestframework"
@echo "# devhelp"
.PHONY: epub
epub: regen-api-doc
$(SPHINXBUILD) -b epub $(ALLSPHINXOPTS) $(BUILDDIR)/epub
@echo
@echo "Build finished. The epub file is in $(BUILDDIR)/epub."
.PHONY: epub3
epub3:
$(SPHINXBUILD) -b epub3 $(ALLSPHINXOPTS) $(BUILDDIR)/epub3
@echo
@echo "Build finished. The epub3 file is in $(BUILDDIR)/epub3."
.PHONY: latex
latex: regen-api-doc
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
@echo
@echo "Build finished; the LaTeX files are in $(BUILDDIR)/latex."
@echo "Run \`make' in that directory to run these through (pdf)latex" \
"(use \`make latexpdf' here to do that automatically)."
.PHONY: latexpdf
latexpdf: regen-api-doc
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
@echo "Running LaTeX files through pdflatex..."
$(MAKE) -C $(BUILDDIR)/latex all-pdf
@echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex."
.PHONY: latexpdfja
latexpdfja: regen-api-doc
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
@echo "Running LaTeX files through platex and dvipdfmx..."
$(MAKE) -C $(BUILDDIR)/latex all-pdf-ja
@echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex."
.PHONY: text
text: regen-api-doc
$(SPHINXBUILD) -b text $(ALLSPHINXOPTS) $(BUILDDIR)/text
@echo
@echo "Build finished. The text files are in $(BUILDDIR)/text."
.PHONY: man
man: regen-api-doc
$(SPHINXBUILD) -b man $(ALLSPHINXOPTS) $(BUILDDIR)/man
@echo
@echo "Build finished. The manual pages are in $(BUILDDIR)/man."
.PHONY: texinfo
texinfo: regen-api-doc
$(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo
@echo
@echo "Build finished. The Texinfo files are in $(BUILDDIR)/texinfo."
@echo "Run \`make' in that directory to run these through makeinfo" \
"(use \`make info' here to do that automatically)."
.PHONY: info
info: regen-api-doc
$(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo
@echo "Running Texinfo files through makeinfo..."
make -C $(BUILDDIR)/texinfo info
@echo "makeinfo finished; the Info files are in $(BUILDDIR)/texinfo."
.PHONY: gettext
gettext: regen-api-doc
$(SPHINXBUILD) -b gettext $(I18NSPHINXOPTS) $(BUILDDIR)/locale
@echo
@echo "Build finished. The message catalogs are in $(BUILDDIR)/locale."
.PHONY: changes
changes: regen-api-doc
$(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) $(BUILDDIR)/changes
@echo
@echo "The overview file is in $(BUILDDIR)/changes."
.PHONY: linkcheck
linkcheck: regen-api-doc
$(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) $(BUILDDIR)/linkcheck
@echo
@echo "Link check complete; look for any errors in the above output " \
"or in $(BUILDDIR)/linkcheck/output.txt."
.PHONY: doctest
doctest: regen-api-doc
$(SPHINXBUILD) -b doctest $(ALLSPHINXOPTS) $(BUILDDIR)/doctest
@echo "Testing of doctests in the sources finished, look at the " \
"results in $(BUILDDIR)/doctest/output.txt."
.PHONY: coverage
coverage: regen-api-doc
$(SPHINXBUILD) -b coverage $(ALLSPHINXOPTS) $(BUILDDIR)/coverage
@echo "Testing of coverage in the sources finished, look at the " \
"results in $(BUILDDIR)/coverage/python.txt."
.PHONY: xml
xml: regen-api-doc
$(SPHINXBUILD) -b xml $(ALLSPHINXOPTS) $(BUILDDIR)/xml
@echo
@echo "Build finished. The XML files are in $(BUILDDIR)/xml."
.PHONY: pseudoxml
pseudoxml: regen-api-doc
$(SPHINXBUILD) -b pseudoxml $(ALLSPHINXOPTS) $(BUILDDIR)/pseudoxml
@echo
@echo "Build finished. The pseudo-XML files are in $(BUILDDIR)/pseudoxml."
.PHONY: dummy
dummy: regen-api-doc
$(SPHINXBUILD) -b dummy $(ALLSPHINXOPTS) $(BUILDDIR)/dummy
@echo
@echo "Build finished. Dummy builder generates no files."
+340
View File
File diff suppressed because it is too large Load Diff
+22
View File
@@ -0,0 +1,22 @@
.. VPP test framework documentation master file, created by
sphinx-quickstart on Thu Oct 13 08:45:03 2016.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
Welcome to VPP test framework's documentation!
==============================================
Contents:
.. toctree::
:maxdepth: 2
modules.rst
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`
+361 -470
View File
File diff suppressed because it is too large Load Diff
+128
View File
@@ -0,0 +1,128 @@
import signal
import os
import pexpect
from logging import *
class Hook(object):
"""
Generic hooks before/after API/CLI calls
"""
def before_api(self, api_name, api_args):
"""
Function called before API call
Emit a debug message describing the API name and arguments
@param api_name: name of the API
@param api_args: tuple containing the API arguments
"""
debug("API: %s (%s)" % (api_name, api_args))
def after_api(self, api_name, api_args):
"""
Function called after API call
@param api_name: name of the API
@param api_args: tuple containing the API arguments
"""
pass
def before_cli(self, cli):
"""
Function called before CLI call
Emit a debug message describing the CLI
@param cli: CLI string
"""
debug("CLI: %s" % (cli))
def after_cli(self, cli):
"""
Function called after CLI call
"""
pass
class VppDiedError(Exception):
pass
class PollHook(Hook):
""" Hook which checks if the vpp subprocess is alive """
def __init__(self, testcase):
self.vpp_dead = False
self.testcase = testcase
def spawn_gdb(self, gdb_path, core_path):
gdb_cmdline = gdb_path + ' ' + self.testcase.vpp_bin + ' ' + core_path
gdb = pexpect.spawn(gdb_cmdline)
gdb.interact()
try:
gdb.terminate(True)
except:
pass
if gdb.isalive():
raise Exception("GDB refused to die...")
def on_crash(self, core_path):
if self.testcase.interactive:
gdb_path = '/usr/bin/gdb'
if os.path.isfile(gdb_path) and os.access(gdb_path, os.X_OK):
# automatically attach gdb
self.spawn_gdb(gdb_path, core_path)
return
else:
error("Debugger '%s' does not exist or is not an executable.." %
gdb_path)
critical('core file present, debug with: gdb ' +
self.testcase.vpp_bin + ' ' + core_path)
def poll_vpp(self):
"""
Poll the vpp status and throw an exception if it's not running
:raises VppDiedError: exception if VPP is not running anymore
"""
if self.vpp_dead:
# already dead, nothing to do
return
self.testcase.vpp.poll()
if self.testcase.vpp.returncode is not None:
signaldict = dict(
(k, v) for v, k in reversed(sorted(signal.__dict__.items()))
if v.startswith('SIG') and not v.startswith('SIG_'))
msg = "VPP subprocess died unexpectedly with returncode %d [%s]" % (
self.testcase.vpp.returncode,
signaldict[abs(self.testcase.vpp.returncode)])
critical(msg)
core_path = self.testcase.tempdir + '/core'
if os.path.isfile(core_path):
self.on_crash(core_path)
self.testcase.vpp_dead = True
raise VppDiedError(msg)
def after_api(self, api_name, api_args):
"""
Check if VPP died after executing an API
:param api_name: name of the API
:param api_args: tuple containing the API arguments
:raises VppDiedError: exception if VPP is not running anymore
"""
super(PollHook, self).after_api(api_name, api_args)
self.poll_vpp()
def after_cli(self, cli):
"""
Check if VPP died after executing a CLI
:param cli: CLI string
:raises Exception: exception if VPP is not running anymore
"""
super(PollHook, self).after_cli(cli)
self.poll_vpp()
+54 -60
View File
@@ -7,100 +7,94 @@ from scapy.layers.inet import IP, UDP
class BridgeDomain(object):
def __init__(self):
## Ethernet frame which is send to pg0 interface and is forwarded to pg1
self.payload_0_1 = (
Ether(src='00:00:00:00:00:01', dst='00:00:00:00:00:02') /
IP(src='1.2.3.4', dst='4.3.2.1') /
UDP(sport=10000, dport=20000) /
Raw('\xa5' * 100))
""" Bridge domain abstraction """
## Ethernet frame which is send to pg1 interface and is forwarded to pg0
self.payload_1_0 = (
Ether(src='00:00:00:00:00:02', dst='00:00:00:00:00:01') /
IP(src='4.3.2.1', dst='1.2.3.4') /
UDP(sport=20000, dport=10000) /
Raw('\xa5' * 100))
@property
def frame_pg0_to_pg1(self):
""" Ethernet frame sent from pg0 and expected to arrive at pg1 """
return (Ether(src='00:00:00:00:00:01', dst='00:00:00:00:00:02') /
IP(src='1.2.3.4', dst='4.3.2.1') /
UDP(sport=10000, dport=20000) /
Raw('\xa5' * 100))
@property
def frame_pg1_to_pg0(self):
""" Ethernet frame sent from pg1 and expected to arrive at pg0 """
return (Ether(src='00:00:00:00:00:02', dst='00:00:00:00:00:01') /
IP(src='4.3.2.1', dst='1.2.3.4') /
UDP(sport=20000, dport=10000) /
Raw('\xa5' * 100))
## Test case must implement this method, so template known how to send
# encapsulated frame.
@abstractmethod
def encapsulate(self, pkt):
""" Encapsulate packet """
pass
## Test case must implement this method, so template known how to get
# original payload.
@abstractmethod
def decapsulate(self, pkt):
""" Decapsulate packet """
pass
## Test case must implement this method, so template known how if the
# received frame is corectly encapsulated.
@abstractmethod
def check_encapsulation(self, pkt):
""" Verify the encapsulation """
pass
## On pg0 interface are encapsulated frames, on pg1 are testing frames
# without encapsulation
def test_decap(self):
## Prepare Ethernet frame that will be send encapsulated.
pkt_to_send = self.encapsulate(self.payload_0_1)
""" Decapsulation test
Send encapsulated frames from pg0
Verify receipt of decapsulated frames on pg1
"""
## Add packet to list of packets.
self.pg_add_stream(0, [pkt_to_send, ])
encapsulated_pkt = self.encapsulate(self.frame_pg0_to_pg1)
## Enable Packet Capture on both ports.
self.pg_enable_capture([0, 1])
self.pg0.add_stream([encapsulated_pkt, ])
self.pg1.enable_capture()
## Start all streams
self.pg_start()
## Pick first received frame and check if is same as non-encapsulated
# frame.
out = self.pg_get_capture(1)
# Pick first received frame and check if it's the non-encapsulated frame
out = self.pg1.get_capture()
self.assertEqual(len(out), 1,
'Invalid number of packets on '
'output: {}'.format(len(out)))
pkt = out[0]
# TODO: add error messages
self.assertEqual(pkt[Ether].src, self.payload_0_1[Ether].src)
self.assertEqual(pkt[Ether].dst, self.payload_0_1[Ether].dst)
self.assertEqual(pkt[IP].src, self.payload_0_1[IP].src)
self.assertEqual(pkt[IP].dst, self.payload_0_1[IP].dst)
self.assertEqual(pkt[UDP].sport, self.payload_0_1[UDP].sport)
self.assertEqual(pkt[UDP].dport, self.payload_0_1[UDP].dport)
self.assertEqual(pkt[Raw], self.payload_0_1[Raw])
self.assertEqual(pkt[Ether].src, self.frame_pg0_to_pg1[Ether].src)
self.assertEqual(pkt[Ether].dst, self.frame_pg0_to_pg1[Ether].dst)
self.assertEqual(pkt[IP].src, self.frame_pg0_to_pg1[IP].src)
self.assertEqual(pkt[IP].dst, self.frame_pg0_to_pg1[IP].dst)
self.assertEqual(pkt[UDP].sport, self.frame_pg0_to_pg1[UDP].sport)
self.assertEqual(pkt[UDP].dport, self.frame_pg0_to_pg1[UDP].dport)
self.assertEqual(pkt[Raw], self.frame_pg0_to_pg1[Raw])
## Send non-encapsulated Ethernet frame from pg1 interface and expect
# encapsulated frame on pg0. On pg0 interface are encapsulated frames,
# on pg1 are testing frames without encapsulation.
def test_encap(self):
## Create packet generator stream.
self.pg_add_stream(1, [self.payload_1_0])
""" Encapsulation test
Send frames from pg1
Verify receipt of encapsulated frames on pg0
"""
self.pg1.add_stream([self.frame_pg1_to_pg0])
## Enable Packet Capture on both ports.
self.pg_enable_capture([0, 1])
self.pg0.enable_capture()
## Start all streams.
self.pg_start()
## Pick first received frame and check if is corectly encapsulated.
out = self.pg_get_capture(0)
# Pick first received frame and check if it's corectly encapsulated.
out = self.pg0.get_capture()
self.assertEqual(len(out), 1,
'Invalid number of packets on '
'output: {}'.format(len(out)))
rcvd = out[0]
self.check_encapsulation(rcvd)
pkt = out[0]
self.check_encapsulation(pkt)
## Get original frame from received packet and check if is same as
# sended frame.
rcvd_payload = self.decapsulate(rcvd)
payload = self.decapsulate(pkt)
# TODO: add error messages
self.assertEqual(rcvd_payload[Ether].src, self.payload_1_0[Ether].src)
self.assertEqual(rcvd_payload[Ether].dst, self.payload_1_0[Ether].dst)
self.assertEqual(rcvd_payload[IP].src, self.payload_1_0[IP].src)
self.assertEqual(rcvd_payload[IP].dst, self.payload_1_0[IP].dst)
self.assertEqual(rcvd_payload[UDP].sport, self.payload_1_0[UDP].sport)
self.assertEqual(rcvd_payload[UDP].dport, self.payload_1_0[UDP].dport)
self.assertEqual(rcvd_payload[Raw], self.payload_1_0[Raw])
self.assertEqual(payload[Ether].src, self.frame_pg1_to_pg0[Ether].src)
self.assertEqual(payload[Ether].dst, self.frame_pg1_to_pg0[Ether].dst)
self.assertEqual(payload[IP].src, self.frame_pg1_to_pg0[IP].src)
self.assertEqual(payload[IP].dst, self.frame_pg1_to_pg0[IP].dst)
self.assertEqual(payload[UDP].sport, self.frame_pg1_to_pg0[UDP].sport)
self.assertEqual(payload[UDP].dport, self.frame_pg1_to_pg0[UDP].dport)
self.assertEqual(payload[Raw], self.frame_pg1_to_pg0[Raw])
+101 -197
View File
File diff suppressed because it is too large Load Diff
+102 -217
View File
File diff suppressed because it is too large Load Diff
+147 -402
View File
File diff suppressed because it is too large Load Diff
+102 -166
View File
File diff suppressed because it is too large Load Diff
+129 -113
View File
File diff suppressed because it is too large Load Diff
+52 -54
View File
@@ -1,8 +1,8 @@
#!/usr/bin/env python
import unittest
from logging import *
from framework import VppTestCase, VppTestRunner
from util import Util
from template_bd import BridgeDomain
from scapy.layers.l2 import Ether
@@ -10,61 +10,49 @@ from scapy.layers.inet import IP, UDP
from scapy_handlers.vxlan import VXLAN
## TestVxlan is a subclass of BridgeDomain, Util, VppTestCase classes.
#
# TestVxlan class defines VXLAN test cases for VXLAN encapsulation,
# decapsulation and VXLAN tunnel termination in L2 bridge-domain.
class TestVxlan(BridgeDomain, Util, VppTestCase):
class TestVxlan(BridgeDomain, VppTestCase):
""" VXLAN Test Case """
## Method to initialize all parent classes.
#
# Initialize BridgeDomain objects, set documentation string for inherited
# tests and initialize VppTestCase object which must be called after
# doc strings are set.
def __init__(self, *args):
BridgeDomain.__init__(self)
self.test_decap.__func__.__doc__ = ' VXLAN BD decapsulation '
self.test_encap.__func__.__doc__ = ' VXLAN BD encapsulation '
VppTestCase.__init__(self, *args)
## Method for VXLAN encapsulate function.
#
# Encapsulate the original payload frame by adding VXLAN header with its
# UDP, IP and Ethernet fields.
def encapsulate(self, pkt):
return (Ether(src=self.MY_MACS[0], dst=self.VPP_MACS[0]) /
IP(src=self.MY_IP4S[0], dst=self.VPP_IP4S[0]) /
UDP(sport=4789, dport=4789, chksum=0) /
VXLAN(vni=1) /
"""
Encapsulate the original payload frame by adding VXLAN header with its
UDP, IP and Ethernet fields
"""
return (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
UDP(sport=self.dport, dport=self.dport, chksum=0) /
VXLAN(vni=self.vni) /
pkt)
## Method for VXLAN decapsulate function.
#
# Decapsulate the original payload frame by removing VXLAN header with
# its UDP, IP and Ethernet fields.
def decapsulate(self, pkt):
"""
Decapsulate the original payload frame by removing VXLAN header
"""
return pkt[VXLAN].payload
## Method for checking VXLAN encapsulation.
# Method for checking VXLAN encapsulation.
#
def check_encapsulation(self, pkt):
# TODO: add error messages
## Verify source MAC is VPP_MAC and destination MAC is MY_MAC resolved
# Verify source MAC is VPP_MAC and destination MAC is MY_MAC resolved
# by VPP using ARP.
self.assertEqual(pkt[Ether].src, self.VPP_MACS[0])
self.assertEqual(pkt[Ether].dst, self.MY_MACS[0])
## Verify VXLAN tunnel source IP is VPP_IP and destination IP is MY_IP.
self.assertEqual(pkt[IP].src, self.VPP_IP4S[0])
self.assertEqual(pkt[IP].dst, self.MY_IP4S[0])
## Verify UDP destination port is VXLAN 4789, source UDP port could be
self.assertEqual(pkt[Ether].src, self.pg0.local_mac)
self.assertEqual(pkt[Ether].dst, self.pg0.remote_mac)
# Verify VXLAN tunnel source IP is VPP_IP and destination IP is MY_IP.
self.assertEqual(pkt[IP].src, self.pg0.local_ip4)
self.assertEqual(pkt[IP].dst, self.pg0.remote_ip4)
# Verify UDP destination port is VXLAN 4789, source UDP port could be
# arbitrary.
self.assertEqual(pkt[UDP].dport, 4789)
self.assertEqual(pkt[UDP].dport, type(self).dport)
# TODO: checksum check
## Verify VNI, based on configuration it must be 1.
self.assertEqual(pkt[VXLAN].vni, 1)
# Verify VNI, based on configuration it must be 1.
self.assertEqual(pkt[VXLAN].vni, type(self).vni)
## Class method to start the VXLAN test case.
# Class method to start the VXLAN test case.
# Overrides setUpClass method in VppTestCase class.
# Python try..except statement is used to ensure that the tear down of
# the class will be executed even if exception is raised.
@@ -72,31 +60,41 @@ class TestVxlan(BridgeDomain, Util, VppTestCase):
@classmethod
def setUpClass(cls):
super(TestVxlan, cls).setUpClass()
try:
## Create 2 pg interfaces.
cls.create_interfaces(range(2))
## Configure IPv4 addresses on VPP pg0.
cls.config_ip4([0])
## Resolve MAC address for VPP's IP address on pg0.
cls.resolve_arp([0])
## Create VXLAN VTEP on VPP pg0, and put vxlan_tunnel0 and pg1
try:
cls.dport = 4789
cls.vni = 1
# Create 2 pg interfaces.
cls.create_pg_interfaces(range(2))
cls.pg0.admin_up()
cls.pg1.admin_up()
# Configure IPv4 addresses on VPP pg0.
cls.pg0.config_ip4()
# Resolve MAC address for VPP's IP address on pg0.
cls.pg0.resolve_arp()
# Create VXLAN VTEP on VPP pg0, and put vxlan_tunnel0 and pg1
# into BD.
cls.api("vxlan_add_del_tunnel src %s dst %s vni 1" %
(cls.VPP_IP4S[0], cls.MY_IP4S[0]))
cls.api("sw_interface_set_l2_bridge vxlan_tunnel0 bd_id 1")
cls.api("sw_interface_set_l2_bridge pg1 bd_id 1")
except:
## In case setUpClass fails run tear down.
cls.tearDownClass()
r = cls.vapi.vxlan_add_del_tunnel(
src_addr=cls.pg0.local_ip4n,
dst_addr=cls.pg0.remote_ip4n,
vni=cls.vni)
cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index, bd_id=1)
cls.vapi.sw_interface_set_l2_bridge(cls.pg1.sw_if_index, bd_id=1)
except Exception:
super(TestVxlan, cls).tearDownClass()
raise
## Method to define VPP actions before tear down of the test case.
# Method to define VPP actions before tear down of the test case.
# Overrides tearDown method in VppTestCase class.
# @param self The object pointer.
def tearDown(self):
super(TestVxlan, self).tearDown()
self.cli(2, "show bridge-domain 1 detail")
if not self.vpp_dead:
info(self.vapi.cli("show bridge-domain 1 detail"))
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)
+19 -133
View File
@@ -1,139 +1,25 @@
## @package util
# Module with common functions that should be used by the test cases.
#
# The module provides a set of tools for setup the test environment
from scapy.layers.l2 import Ether, ARP
from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6NDOptSrcLLAddr
from logging import *
## Util class
#
# Test cases that want to use methods defined in Util class should
# inherit this class.
#
# class Example(Util, VppTestCase):
# pass
class Util(object):
class TestHost(object):
""" Generic test host "connected" to VPP. """
## Class method to send ARP Request for each VPP IPv4 address in
# order to determine VPP interface MAC address to IPv4 bindings.
#
# Resolved MAC address is saved to the VPP_MACS dictionary with interface
# index as a key. ARP Request is sent from MAC in MY_MACS dictionary with
# interface index as a key.
# @param cls The class pointer.
# @param args List variable to store indices of VPP interfaces.
@classmethod
def resolve_arp(cls, args):
for i in args:
ip = cls.VPP_IP4S[i]
cls.log("Sending ARP request for %s on port %u" % (ip, i))
arp_req = (Ether(dst="ff:ff:ff:ff:ff:ff", src=cls.MY_MACS[i]) /
ARP(op=ARP.who_has, pdst=ip,
psrc=cls.MY_IP4S[i], hwsrc=cls.MY_MACS[i]))
cls.pg_add_stream(i, arp_req)
cls.pg_enable_capture([i])
@property
def mac(self):
""" MAC address """
return self._mac
cls.cli(2, "trace add pg-input 1")
cls.pg_start()
arp_reply = cls.pg_get_capture(i)[0]
if arp_reply[ARP].op == ARP.is_at:
cls.log("VPP pg%u MAC address is %s " % (i, arp_reply[ARP].hwsrc))
cls.VPP_MACS[i] = arp_reply[ARP].hwsrc
else:
cls.log("No ARP received on port %u" % i)
cls.cli(2, "show trace")
## @var ip
# <TODO add description>
## @var arp_req
# <TODO add description>
## @var arp_reply
# <TODO add description>
## @var VPP_MACS
# <TODO add description>
@property
def ip4(self):
""" IPv4 address """
return self._ip4
## Class method to send ND request for each VPP IPv6 address in
# order to determine VPP MAC address to IPv6 bindings.
#
# Resolved MAC address is saved to the VPP_MACS dictionary with interface
# index as a key. ND Request is sent from MAC in MY_MACS dictionary with
# interface index as a key.
# @param cls The class pointer.
# @param args List variable to store indices of VPP interfaces.
@classmethod
def resolve_icmpv6_nd(cls, args):
for i in args:
ip = cls.VPP_IP6S[i]
cls.log("Sending ICMPv6ND_NS request for %s on port %u" % (ip, i))
nd_req = (Ether(dst="ff:ff:ff:ff:ff:ff", src=cls.MY_MACS[i]) /
IPv6(src=cls.MY_IP6S[i], dst=ip) /
ICMPv6ND_NS(tgt=ip) /
ICMPv6NDOptSrcLLAddr(lladdr=cls.MY_MACS[i]))
cls.pg_add_stream(i, nd_req)
cls.pg_enable_capture([i])
@property
def ip6(self):
""" IPv6 address """
return self._ip6
cls.cli(2, "trace add pg-input 1")
cls.pg_start()
nd_reply = cls.pg_get_capture(i)[0]
icmpv6_na = nd_reply['ICMPv6 Neighbor Discovery - Neighbor Advertisement']
dst_ll_addr = icmpv6_na['ICMPv6 Neighbor Discovery Option - Destination Link-Layer Address']
cls.VPP_MACS[i] = dst_ll_addr.lladdr
## @var ip
# <TODO add description>
## @var nd_req
# <TODO add description>
## @var nd_reply
# <TODO add description>
## @var icmpv6_na
# <TODO add description>
## @var dst_ll_addr
# <TODO add description>
## @var VPP_MACS
# <TODO add description>
## Class method to configure IPv4 addresses on VPP interfaces.
#
# Set dictionary variables MY_IP4S and VPP_IP4S to IPv4 addresses
# calculated using interface VPP interface index as a parameter.
# /24 IPv4 prefix is used, with VPP interface address host part set
# to .1 and MY address set to .2.
# Used IPv4 prefix scheme: 172.16.{VPP-interface-index}.0/24.
# @param cls The class pointer.
# @param args List variable to store indices of VPP interfaces.
@classmethod
def config_ip4(cls, args):
for i in args:
cls.MY_IP4S[i] = "172.16.%u.2" % i
cls.VPP_IP4S[i] = "172.16.%u.1" % i
cls.api("sw_interface_add_del_address pg%u %s/24" % (i, cls.VPP_IP4S[i]))
cls.log("My IPv4 address is %s" % (cls.MY_IP4S[i]))
## @var MY_IP4S
# Dictionary variable to store host IPv4 addresses connected to packet
# generator interfaces.
## @var VPP_IP4S
# Dictionary variable to store VPP IPv4 addresses of the packet
# generator interfaces.
## Class method to configure IPv6 addresses on VPP interfaces.
#
# Set dictionary variables MY_IP6S and VPP_IP6S to IPv6 addresses
# calculated using interface VPP interface index as a parameter.
# /64 IPv6 prefix is used, with VPP interface address host part set
# to ::1 and MY address set to ::2.
# Used IPv6 prefix scheme: fd10:{VPP-interface-index}::0/64.
# @param cls The class pointer.
# @param args List variable to store indices of VPP interfaces.
@classmethod
def config_ip6(cls, args):
for i in args:
cls.MY_IP6S[i] = "fd10:%u::2" % i
cls.VPP_IP6S[i] = "fd10:%u::1" % i
cls.api("sw_interface_add_del_address pg%u %s/64" % (i, cls.VPP_IP6S[i]))
cls.log("My IPv6 address is %s" % (cls.MY_IP6S[i]))
## @var MY_IP6S
# Dictionary variable to store host IPv6 addresses connected to packet
# generator interfaces.
## @var VPP_IP6S
# Dictionary variable to store VPP IPv6 addresses of the packet
# generator interfaces.
def __init__(self, mac=None, ip4=None, ip6=None):
self._mac = mac
self._ip4 = ip4
self._ip6 = ip6
+240
View File
@@ -0,0 +1,240 @@
from abc import abstractmethod, ABCMeta
import socket
from logging import info, error
from scapy.layers.l2 import Ether, ARP
from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptDstLLAddr
class VppInterface(object):
"""
Generic VPP interface
"""
__metaclass__ = ABCMeta
@property
def sw_if_index(self):
"""Interface index assigned by VPP"""
return self._sw_if_index
@property
def remote_mac(self):
"""MAC-address of the remote interface "connected" to this interface"""
return self._remote_mac
@property
def local_mac(self):
"""MAC-address of the VPP interface"""
return self._local_mac
@property
def local_ip4(self):
"""Local IPv4 address on VPP interface (string)"""
return self._local_ip4
@property
def local_ip4n(self):
"""Local IPv4 address - raw, suitable as API parameter"""
return self._local_ip4n
@property
def remote_ip4(self):
"""IPv4 address of remote peer "connected" to this interface"""
return self._remote_ip4
@property
def remote_ip4n(self):
"""IPv4 address of remote peer - raw, suitable as API parameter"""
return self._remote_ip4n
@property
def local_ip6(self):
"""Local IPv6 address on VPP interface (string)"""
return self._local_ip6
@property
def local_ip6n(self):
"""Local IPv6 address - raw, suitable as API parameter"""
return self._local_ip6n
@property
def remote_ip6(self):
"""IPv6 address of remote peer "connected" to this interface"""
return self._remote_ip6
@property
def remote_ip6n(self):
"""IPv6 address of remote peer - raw, suitable as API parameter"""
return self._remote_ip6n
@property
def name(self):
"""Name of the interface"""
return self._name
@property
def dump(self):
"""Raw result of sw_interface_dump for this interface"""
return self._dump
@property
def test(self):
"""Test case creating this interface"""
return self._test
def post_init_setup(self):
"""Additional setup run after creating an interface object"""
self._remote_mac = "02:00:00:00:ff:%02x" % self.sw_if_index
self._local_ip4 = "172.16.%u.1" % self.sw_if_index
self._local_ip4n = socket.inet_pton(socket.AF_INET, self.local_ip4)
self._remote_ip4 = "172.16.%u.2" % self.sw_if_index
self._remote_ip4n = socket.inet_pton(socket.AF_INET, self.remote_ip4)
self._local_ip6 = "fd01:%u::1" % self.sw_if_index
self._local_ip6n = socket.inet_pton(socket.AF_INET6, self.local_ip6)
self._remote_ip6 = "fd01:%u::2" % self.sw_if_index
self._remote_ip6n = socket.inet_pton(socket.AF_INET6, self.remote_ip6)
r = self.test.vapi.sw_interface_dump()
for intf in r:
if intf.sw_if_index == self.sw_if_index:
self._name = intf.interface_name.split(b'\0', 1)[0]
self._local_mac = ':'.join(intf.l2_address.encode('hex')[i:i + 2]
for i in range(0, 12, 2))
self._dump = intf
break
else:
raise Exception(
"Could not find interface with sw_if_index %d "
"in interface dump %s" %
(self.sw_if_index, repr(r)))
@abstractmethod
def __init__(self, test, index):
self._test = test
self.post_init_setup()
info("New %s, MAC=%s, remote_ip4=%s, local_ip4=%s" %
(self.__name__, self.remote_mac, self.remote_ip4, self.local_ip4))
def config_ip4(self):
"""Configure IPv4 address on the VPP interface"""
addr = self.local_ip4n
addr_len = 24
self.test.vapi.sw_interface_add_del_address(
self.sw_if_index, addr, addr_len)
def config_ip6(self):
"""Configure IPv6 address on the VPP interface"""
addr = self._local_ip6n
addr_len = 64
self.test.vapi.sw_interface_add_del_address(
self.sw_if_index, addr, addr_len, is_ipv6=1)
def disable_ipv6_ra(self):
"""Configure IPv6 RA suppress on the VPP interface"""
self.test.vapi.sw_interface_ra_suppress(self.sw_if_index)
def create_arp_req(self):
"""Create ARP request applicable for this interface"""
return (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.remote_mac) /
ARP(op=ARP.who_has, pdst=self.local_ip4,
psrc=self.remote_ip4, hwsrc=self.remote_mac))
def create_ndp_req(self):
return (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.remote_mac) /
IPv6(src=self.remote_ip6, dst=self.local_ip6) /
ICMPv6ND_NS(tgt=self.local_ip6) /
ICMPv6NDOptSrcLLAddr(lladdr=self.remote_mac))
def resolve_arp(self, pg_interface=None):
"""Resolve ARP using provided packet-generator interface
:param pg_interface: interface used to resolve, if None then this
interface is used
"""
if pg_interface is None:
pg_interface = self
info("Sending ARP request for %s on port %s" %
(self.local_ip4, pg_interface.name))
arp_req = self.create_arp_req()
pg_interface.add_stream(arp_req)
pg_interface.enable_capture()
self.test.pg_start()
info(self.test.vapi.cli("show trace"))
arp_reply = pg_interface.get_capture()
if arp_reply is None or len(arp_reply) == 0:
info("No ARP received on port %s" % pg_interface.name)
return
arp_reply = arp_reply[0]
# Make Dot1AD packet content recognizable to scapy
if arp_reply.type == 0x88a8:
arp_reply.type = 0x8100
arp_reply = Ether(str(arp_reply))
try:
if arp_reply[ARP].op == ARP.is_at:
info("VPP %s MAC address is %s " %
(self.name, arp_reply[ARP].hwsrc))
self._local_mac = arp_reply[ARP].hwsrc
else:
info("No ARP received on port %s" % pg_interface.name)
except:
error("Unexpected response to ARP request:")
error(arp_reply.show())
raise
def resolve_ndp(self, pg_interface=None):
"""Resolve NDP using provided packet-generator interface
:param pg_interface: interface used to resolve, if None then this
interface is used
"""
if pg_interface is None:
pg_interface = self
info("Sending NDP request for %s on port %s" %
(self.local_ip6, pg_interface.name))
ndp_req = self.create_ndp_req()
pg_interface.add_stream(ndp_req)
pg_interface.enable_capture()
self.test.pg_start()
info(self.test.vapi.cli("show trace"))
ndp_reply = pg_interface.get_capture()
if ndp_reply is None or len(ndp_reply) == 0:
info("No NDP received on port %s" % pg_interface.name)
return
ndp_reply = ndp_reply[0]
# Make Dot1AD packet content recognizable to scapy
if ndp_reply.type == 0x88a8:
ndp_reply.type = 0x8100
ndp_reply = Ether(str(ndp_reply))
try:
ndp_na = ndp_reply[ICMPv6ND_NA]
opt = ndp_na[ICMPv6NDOptDstLLAddr]
info("VPP %s MAC address is %s " %
(self.name, opt.lladdr))
self._local_mac = opt.lladdr
except:
error("Unexpected response to NDP request:")
error(ndp_reply.show())
raise
def admin_up(self):
""" Put interface ADMIN-UP """
self.test.vapi.sw_interface_set_flags(self.sw_if_index, admin_up_down=1)
def add_sub_if(self, sub_if):
"""
Register a sub-interface with this interface
:param sub_if: sub-interface
"""
if not hasattr(self, 'sub_if'):
self.sub_if = sub_if
else:
if isinstance(self.sub_if, list):
self.sub_if.append(sub_if)
else:
self.sub_if = sub_if
File diff suppressed because it is too large Load Diff
+99
View File
@@ -0,0 +1,99 @@
import os
from logging import error
from scapy.utils import wrpcap, rdpcap
from vpp_interface import VppInterface
class VppPGInterface(VppInterface):
"""
VPP packet-generator interface
"""
@property
def pg_index(self):
"""packet-generator interface index assigned by VPP"""
return self._pg_index
@property
def out_path(self):
"""pcap file path - captured packets"""
return self._out_path
@property
def in_path(self):
""" pcap file path - injected packets"""
return self._in_path
@property
def capture_cli(self):
"""CLI string to start capture on this interface"""
return self._capture_cli
@property
def cap_name(self):
"""capture name for this interface"""
return self._cap_name
@property
def input_cli(self):
"""CLI string to load the injected packets"""
return self._input_cli
def post_init_setup(self):
""" Perform post-init setup for super class and add our own setup """
super(VppPGInterface, self).post_init_setup()
self._out_path = self.test.tempdir + "/pg%u_out.pcap" % self.sw_if_index
self._in_path = self.test.tempdir + "/pg%u_in.pcap" % self.sw_if_index
self._capture_cli = "packet-generator capture pg%u pcap %s" % (
self.pg_index, self.out_path)
self._cap_name = "pcap%u" % self.sw_if_index
self._input_cli = "packet-generator new pcap %s source pg%u name %s" % (
self.in_path, self.pg_index, self.cap_name)
def __init__(self, test, pg_index):
""" Create VPP packet-generator interface """
self._pg_index = pg_index
self._test = test
r = self.test.vapi.pg_create_interface(self.pg_index)
self._sw_if_index = r.sw_if_index
self.post_init_setup()
def enable_capture(self):
""" Enable capture on this packet-generator interface"""
try:
os.unlink(self.out_path)
except:
pass
# FIXME this should be an API, but no such exists atm
self.test.vapi.cli(self.capture_cli)
def add_stream(self, pkts):
"""
Add a stream of packets to this packet-generator
:param pkts: iterable packets
"""
try:
os.remove(self.in_path)
except:
pass
wrpcap(self.in_path, pkts)
# FIXME this should be an API, but no such exists atm
self.test.vapi.cli(self.input_cli)
self.test.pg_streams.append(self.cap_name)
self.test.vapi.cli("trace add pg-input %d" % len(pkts))
def get_capture(self):
"""
Get captured packets
:returns: iterable packets
"""
try:
output = rdpcap(self.out_path)
except IOError: # TODO
error("File %s does not exist, probably because no"
" packets arrived" % self.out_path)
return []
return output
+143
View File
@@ -0,0 +1,143 @@
from scapy.layers.l2 import Ether, Dot1Q
from abc import abstractmethod, ABCMeta
from vpp_interface import VppInterface
class VppSubInterface(VppInterface):
__metaclass__ = ABCMeta
@property
def parent(self):
"""Parent interface for this sub-interface"""
return self._parent
@property
def sub_id(self):
"""Sub-interface ID"""
return self._sub_id
def __init__(self, test, parent, sub_id):
self._test = test
self._parent = parent
self._parent.add_sub_if(self)
self._sub_id = sub_id
@abstractmethod
def create_arp_req(self):
pass
@abstractmethod
def create_ndp_req(self):
pass
def resolve_arp(self):
super(VppSubInterface, self).resolve_arp(self.parent)
def resolve_ndp(self):
super(VppSubInterface, self).resolve_ndp(self.parent)
@abstractmethod
def add_dot1_layer(self, pkt):
pass
class VppDot1QSubint(VppSubInterface):
@property
def vlan(self):
"""VLAN tag"""
return self._vlan
def __init__(self, test, parent, sub_id, vlan=None):
if vlan is None:
vlan = sub_id
super(VppDot1QSubint, self).__init__(test, parent, sub_id)
self._vlan = vlan
r = self.test.vapi.create_vlan_subif(parent.sw_if_index, self.vlan)
self._sw_if_index = r.sw_if_index
self.post_init_setup()
def create_arp_req(self):
packet = VppInterface.create_arp_req(self)
return self.add_dot1_layer(packet)
def create_ndp_req(self):
packet = VppInterface.create_ndp_req(self)
return self.add_dot1_layer(packet)
def add_dot1_layer(self, packet):
payload = packet.payload
packet.remove_payload()
packet.add_payload(Dot1Q(vlan=self.sub_id) / payload)
return packet
def remove_dot1_layer(self, packet):
payload = packet.payload
self.test.instance().assertEqual(type(payload), Dot1Q)
self.test.instance().assertEqual(payload.vlan, self.vlan)
payload = payload.payload
packet.remove_payload()
packet.add_payload(payload)
return packet
class VppDot1ADSubint(VppSubInterface):
@property
def outer_vlan(self):
"""Outer VLAN tag"""
return self._outer_vlan
@property
def inner_vlan(self):
"""Inner VLAN tag"""
return self._inner_vlan
def __init__(self, test, parent, sub_id, outer_vlan, inner_vlan):
super(VppDot1ADSubint, self).__init__(test, parent, sub_id)
self.DOT1AD_TYPE = 0x88A8
self.DOT1Q_TYPE = 0x8100
self._outer_vlan = outer_vlan
self._inner_vlan = inner_vlan
r = self.test.vapi.create_subif(
parent.sw_if_index,
self.sub_id,
self.outer_vlan,
self.inner_vlan,
dot1ad=1,
two_tags=1,
exact_match=1)
self._sw_if_index = r.sw_if_index
self.post_init_setup()
def create_arp_req(self):
packet = VppInterface.create_arp_req(self)
return self.add_dot1_layer(packet)
def create_ndp_req(self):
packet = VppInterface.create_ndp_req(self)
return self.add_dot1_layer(packet)
def add_dot1_layer(self, packet):
payload = packet.payload
packet.remove_payload()
packet.add_payload(Dot1Q(vlan=self.outer_vlan) /
Dot1Q(vlan=self.inner_vlan) / payload)
packet.type = self.DOT1AD_TYPE
return packet
def remove_dot1_layer(self, packet):
self.test.instance().assertEqual(type(packet), Ether)
self.test.instance().assertEqual(packet.type, self.DOT1AD_TYPE)
packet.type = self.DOT1Q_TYPE
packet = Ether(str(packet))
payload = packet.payload
self.test.instance().assertEqual(type(payload), Dot1Q)
self.test.instance().assertEqual(payload.vlan, self.outer_vlan)
payload = payload.payload
self.test.instance().assertEqual(type(payload), Dot1Q)
self.test.instance().assertEqual(payload.vlan, self.inner_vlan)
payload = payload.payload
packet.remove_payload()
packet.add_payload(payload)
return packet