test: new test infrastructure
Change-Id: I73ca19c431743f6b39669c583d9222a6559346ef Signed-off-by: Jan Gelety <jgelety@cisco.com> Signed-off-by: Juraj Sloboda <jsloboda@cisco.com> Signed-off-by: Stefan Kobza <skobza@cisco.com> Signed-off-by: Matej Klotton <mklotton@cisco.com> Signed-off-by: Maciek Konstantynowicz <mkonstan@cisco.com> Signed-off-by: Damjan Marion <damarion@cisco.com>
This commit is contained in:

committed by
Dave Barach

parent
6c3ebcc2bf
commit
f56b77a076
33
Makefile
33
Makefile
@ -58,6 +58,7 @@ endif
|
||||
.PHONY: help bootstrap wipe wipe-release build build-release rebuild rebuild-release
|
||||
.PHONY: run run-release debug debug-release build-vat run-vat pkg-deb pkg-rpm
|
||||
.PHONY: ctags cscope plugins plugins-release build-vpp-api
|
||||
.PHONY: test test-debug retest retest-debug
|
||||
|
||||
help:
|
||||
@echo "Make Targets:"
|
||||
@ -75,6 +76,10 @@ help:
|
||||
@echo " run-release - run release binary"
|
||||
@echo " debug - run debug binary with debugger"
|
||||
@echo " debug-release - run release binary with debugger"
|
||||
@echo " test - build and run functional tests"
|
||||
@echo " test-debug - build and run functional tests (debug build)"
|
||||
@echo " retest - run functional tests"
|
||||
@echo " retest-debug - run functional tests (debug build)"
|
||||
@echo " build-vat - build vpp-api-test tool"
|
||||
@echo " build-vpp-api - build vpp-api"
|
||||
@echo " run-vat - run vpp-api-test tool"
|
||||
@ -98,6 +103,7 @@ help:
|
||||
@echo " startup.conf file is present"
|
||||
@echo " GDB=<path> - gdb binary to use for debugging"
|
||||
@echo " PLATFORM=<name> - target platform. default is vpp"
|
||||
@echo " TEST=<name> - only run specific test"
|
||||
@echo ""
|
||||
@echo "Current Argumernt Values:"
|
||||
@echo " V = $(V)"
|
||||
@ -182,6 +188,33 @@ plugins-release: $(BR)/.bootstrap.ok
|
||||
build-vpp-api: $(BR)/.bootstrap.ok
|
||||
$(call make,$(PLATFORM)_debug,vpp-api-install)
|
||||
|
||||
define test
|
||||
@sudo make -C test \
|
||||
VPP_TEST_BIN=$(BR)/install-$(1)-native/vpp/bin/vpp \
|
||||
VPP_TEST_API_TEST_BIN=$(BR)/install-$(1)-native/vpp-api-test/bin/vpp_api_test \
|
||||
V=$(V) TEST=$(TEST)
|
||||
endef
|
||||
|
||||
test:
|
||||
ifeq ($(OS_ID),ubuntu)
|
||||
@sudo -E apt-get $(CONFIRM) $(FORCE) install python-dev python-scapy
|
||||
endif
|
||||
@make -C $(BR) PLATFORM=vpp_lite TAG=vpp_lite vpp-install vpp-api-test-install
|
||||
$(call test,vpp_lite)
|
||||
|
||||
test-debug:
|
||||
ifeq ($(OS_ID),ubuntu)
|
||||
@sudo -E apt-get $(CONFIRM) $(FORCE) install python-dev python-scapy
|
||||
endif
|
||||
@make -C $(BR) PLATFORM=vpp_lite TAG=vpp_lite_debug vpp-install vpp-api-test-install
|
||||
$(call test,vpp_lite_debug)
|
||||
|
||||
retest:
|
||||
$(call test,vpp_lite)
|
||||
|
||||
retest-debug:
|
||||
$(call test,vpp_lite_debug)
|
||||
|
||||
STARTUP_DIR ?= $(PWD)
|
||||
ifeq ("$(wildcard $(STARTUP_CONF))","")
|
||||
define run
|
||||
|
3
test/Makefile
Normal file
3
test/Makefile
Normal file
@ -0,0 +1,3 @@
|
||||
|
||||
all:
|
||||
@python run_tests.py discover -p test_$(TEST)"*.py"
|
621
test/framework.py
Normal file
621
test/framework.py
Normal file
File diff suppressed because it is too large
Load Diff
12
test/run_tests.py
Normal file
12
test/run_tests.py
Normal file
@ -0,0 +1,12 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import os
|
||||
import unittest
|
||||
from framework import VppTestRunner
|
||||
|
||||
if __name__ == '__main__':
|
||||
try:
|
||||
verbose = int(os.getenv("V", 0))
|
||||
except:
|
||||
verbose = 0
|
||||
unittest.main(testRunner=VppTestRunner, module=None, verbosity=verbose)
|
0
test/scapy_handlers/__init__.py
Normal file
0
test/scapy_handlers/__init__.py
Normal file
17
test/scapy_handlers/vxlan.py
Normal file
17
test/scapy_handlers/vxlan.py
Normal file
@ -0,0 +1,17 @@
|
||||
from scapy.fields import BitField, XByteField, X3BytesField
|
||||
from scapy.packet import Packet, bind_layers
|
||||
from scapy.layers.l2 import Ether
|
||||
from scapy.layers.inet import UDP
|
||||
|
||||
|
||||
class VXLAN(Packet):
|
||||
name = "VXLAN"
|
||||
fields_desc = [BitField("flags", 0x08000000, 32),
|
||||
X3BytesField("vni", 0),
|
||||
XByteField("reserved", 0x00)]
|
||||
|
||||
def mysummary(self):
|
||||
return self.sprintf("VXLAN (vni=%VXLAN.vni%)")
|
||||
|
||||
bind_layers(UDP, VXLAN, dport=4789)
|
||||
bind_layers(VXLAN, Ether)
|
106
test/template_bd.py
Normal file
106
test/template_bd.py
Normal file
@ -0,0 +1,106 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
from abc import abstractmethod
|
||||
|
||||
from scapy.layers.l2 import Ether, Raw
|
||||
from scapy.layers.inet import IP, UDP
|
||||
|
||||
|
||||
class BridgeDomain(object):
|
||||
def __init__(self):
|
||||
## Ethernet frame which is send to pg0 interface and is forwarded to pg1
|
||||
self.payload_0_1 = (
|
||||
Ether(src='00:00:00:00:00:01', dst='00:00:00:00:00:02') /
|
||||
IP(src='1.2.3.4', dst='4.3.2.1') /
|
||||
UDP(sport=10000, dport=20000) /
|
||||
Raw('\xa5' * 100))
|
||||
|
||||
## Ethernet frame which is send to pg1 interface and is forwarded to pg0
|
||||
self.payload_1_0 = (
|
||||
Ether(src='00:00:00:00:00:02', dst='00:00:00:00:00:01') /
|
||||
IP(src='4.3.2.1', dst='1.2.3.4') /
|
||||
UDP(sport=20000, dport=10000) /
|
||||
Raw('\xa5' * 100))
|
||||
|
||||
## Test case must implement this method, so template known how to send
|
||||
# encapsulated frame.
|
||||
@abstractmethod
|
||||
def encapsulate(self, pkt):
|
||||
pass
|
||||
|
||||
## Test case must implement this method, so template known how to get
|
||||
# original payload.
|
||||
@abstractmethod
|
||||
def decapsulate(self, pkt):
|
||||
pass
|
||||
|
||||
## Test case must implement this method, so template known how if the
|
||||
# received frame is corectly encapsulated.
|
||||
@abstractmethod
|
||||
def check_encapsulation(self, pkt):
|
||||
pass
|
||||
|
||||
## On pg0 interface are encapsulated frames, on pg1 are testing frames
|
||||
# without encapsulation
|
||||
def test_decap(self):
|
||||
## Prepare Ethernet frame that will be send encapsulated.
|
||||
pkt_to_send = self.encapsulate(self.payload_0_1)
|
||||
|
||||
## Add packet to list of packets.
|
||||
self.pg_add_stream(0, [pkt_to_send, ])
|
||||
|
||||
## Enable Packet Capture on both ports.
|
||||
self.pg_enable_capture([0, 1])
|
||||
|
||||
## Start all streams
|
||||
self.pg_start()
|
||||
|
||||
## Pick first received frame and check if is same as non-encapsulated
|
||||
# frame.
|
||||
out = self.pg_get_capture(1)
|
||||
self.assertEqual(len(out), 1,
|
||||
'Invalid number of packets on '
|
||||
'output: {}'.format(len(out)))
|
||||
pkt = out[0]
|
||||
|
||||
# TODO: add error messages
|
||||
self.assertEqual(pkt[Ether].src, self.payload_0_1[Ether].src)
|
||||
self.assertEqual(pkt[Ether].dst, self.payload_0_1[Ether].dst)
|
||||
self.assertEqual(pkt[IP].src, self.payload_0_1[IP].src)
|
||||
self.assertEqual(pkt[IP].dst, self.payload_0_1[IP].dst)
|
||||
self.assertEqual(pkt[UDP].sport, self.payload_0_1[UDP].sport)
|
||||
self.assertEqual(pkt[UDP].dport, self.payload_0_1[UDP].dport)
|
||||
self.assertEqual(pkt[Raw], self.payload_0_1[Raw])
|
||||
|
||||
## Send non-encapsulated Ethernet frame from pg1 interface and expect
|
||||
# encapsulated frame on pg0. On pg0 interface are encapsulated frames,
|
||||
# on pg1 are testing frames without encapsulation.
|
||||
def test_encap(self):
|
||||
## Create packet generator stream.
|
||||
self.pg_add_stream(1, [self.payload_1_0])
|
||||
|
||||
## Enable Packet Capture on both ports.
|
||||
self.pg_enable_capture([0, 1])
|
||||
|
||||
## Start all streams.
|
||||
self.pg_start()
|
||||
|
||||
## Pick first received frame and check if is corectly encapsulated.
|
||||
out = self.pg_get_capture(0)
|
||||
self.assertEqual(len(out), 1,
|
||||
'Invalid number of packets on '
|
||||
'output: {}'.format(len(out)))
|
||||
rcvd = out[0]
|
||||
self.check_encapsulation(rcvd)
|
||||
|
||||
## Get original frame from received packet and check if is same as
|
||||
# sended frame.
|
||||
rcvd_payload = self.decapsulate(rcvd)
|
||||
# TODO: add error messages
|
||||
self.assertEqual(rcvd_payload[Ether].src, self.payload_1_0[Ether].src)
|
||||
self.assertEqual(rcvd_payload[Ether].dst, self.payload_1_0[Ether].dst)
|
||||
self.assertEqual(rcvd_payload[IP].src, self.payload_1_0[IP].src)
|
||||
self.assertEqual(rcvd_payload[IP].dst, self.payload_1_0[IP].dst)
|
||||
self.assertEqual(rcvd_payload[UDP].sport, self.payload_1_0[UDP].sport)
|
||||
self.assertEqual(rcvd_payload[UDP].dport, self.payload_1_0[UDP].dport)
|
||||
self.assertEqual(rcvd_payload[Raw], self.payload_1_0[Raw])
|
145
test/test_infra.md
Normal file
145
test/test_infra.md
Normal file
@ -0,0 +1,145 @@
|
||||
# VPP Functional Test Infra
|
||||
|
||||
## Running VPP tests
|
||||
VPP functional tests are triggered by `make test` command run in the git vpp source directory. Following Linux environment variables are used by the current VPP functional test infrastructure:
|
||||
|
||||
- `TEST=<name>` - run only specific test identified by filename `test/test_<name>.py`
|
||||
- `V=[0|1|2]` - set verbosity level. `0` for minimal verbosity, `1` for increased verbosity, `2` for maximum verbosity. Default value is 0.
|
||||
|
||||
Example of running tests:
|
||||
|
||||
```
|
||||
~/src/vpp-test-infra$ make test V=1 TEST=vxlan
|
||||
```
|
||||
|
||||
All tests listed in `test/` directory are run by default. To run selected tests you can set variable TEST when starting tests.
|
||||
|
||||
## Overview
|
||||
The main functionality of the test framework is defined in [framework.py](test/framework.py) file. The implementation of the test framework uses classes and methods from Python module *unittest*.
|
||||
|
||||
Three main classes are defined to support the overall test automation:
|
||||
|
||||
* **class VppTestCase(unittest.TestCase)** - a sub-class of *unittest.TestCase* class. Provides methods to create and run test case. These methods can be divided into 5 groups:
|
||||
1. Methods to control test case setup and tear down:
|
||||
* *def setUpConstants(cls):*
|
||||
* *def setUpClass(cls):*
|
||||
* *def quit(cls):*
|
||||
* *def tearDownClass(cls):*
|
||||
* *def tearDown(self):*
|
||||
* *def setUp(self):*
|
||||
|
||||
2. Methods to create VPP packet generator interfaces:
|
||||
* *def create_interfaces(cls, args):*
|
||||
|
||||
3. Methods to execute VPP commands and print logs in the output (terminal for now):
|
||||
* *def log(cls, s, v=1):*
|
||||
* *def api(cls, s):*
|
||||
* *def cli(cls, v, s):*
|
||||
|
||||
4. Methods to control packet stream generation and capturing:
|
||||
* *def pg_add_stream(cls, i, pkts):*
|
||||
* *def pg_enable_capture(cls, args):*
|
||||
* *def pg_start(cls):*
|
||||
* *def pg_get_capture(cls, o):*
|
||||
|
||||
5. Methods to create and verify packets:
|
||||
* *def extend_packet(packet, size):*
|
||||
* *def add_packet_info_to_list(self, info):*
|
||||
* *def create_packet_info(self, pg_id, target_id):*
|
||||
* *def info_to_payload(info):*
|
||||
* *def payload_to_info(payload):*
|
||||
* *def get_next_packet_info(self, info):*
|
||||
* *def get_next_packet_info_for_interface(self, src_pg, info):*
|
||||
* *def get_next_packet_info_for_interface2(self, src_pg, dst_pg, info):*
|
||||
|
||||
* **class VppTestResult(unittest.TestResult)** - a sub-class of *unittest.TestResult* class. Provides methods to compile information about the tests that have succeeded and the ones that have failed. These methods can be divided into 4 groups:
|
||||
1. Processing test case result:
|
||||
* *def addSuccess(self, test):*
|
||||
* *def addFailure(self, test, err):*
|
||||
* *def addError(self, test, err):*
|
||||
|
||||
2. Processing test case description:
|
||||
* *def getDescription(self, test):*
|
||||
|
||||
3. Processing test case start and stop:
|
||||
* *def startTest(self, test):*
|
||||
* *def stopTest(self, test):*
|
||||
|
||||
4. Printing error and failure information:
|
||||
* *def printErrors(self):*
|
||||
* *def printErrorList(self, flavour, errors):*
|
||||
|
||||
* **class VppTestRunner(unittest.TextTestRunner)** - a sub-class of *unittest.TextTestRunner* class. Provides basic test runner implementation that prints results on standard error stream. Contains one method:
|
||||
* *def run(self, test):*
|
||||
|
||||
In addition [util.py] (test/util.py) file defines number of common methods useful for many test cases. All of these methods are currently contained in one class:
|
||||
|
||||
* **class Util(object)**:
|
||||
* *def resolve_arp(cls, args):*
|
||||
* *def resolve_icmpv6_nd(cls, args):*
|
||||
* *def config_ip4(cls, args):*
|
||||
* *def config_ip6(cls, args):*
|
||||
|
||||
## Interaction with VPP
|
||||
VPP is started from command line as a sub-process during the test case setup phase. Command line attributes to start VPP are stored in class variable *vpp_cmdline*.
|
||||
To get an overview of VPP command line attributes, visit section [Command-line Arguments](https://wiki.fd.io/view/VPP/Command-line_Arguments) on VPP wiki page.
|
||||
|
||||
Current VPP test infrastructure is using two ways to interact with VPP for configuration, operational status check, tracing and logging.
|
||||
|
||||
### Using API commands
|
||||
API commands are executed by VPP API test tool that is started from command line as a sub-process. Command line attributes to start VPP API test tool are stored in class variable *vpp_api_test_cmdline*.
|
||||
When executed, API command and its possible output are printed in the terminal if verbosity level is greater then 0.
|
||||
|
||||
Example:
|
||||
|
||||
```
|
||||
cls.api("sw_interface_set_flags pg1 admin-up")
|
||||
```
|
||||
|
||||
will print in the terminal
|
||||
|
||||
```
|
||||
API: sw_interface_set_flags pg1 admin-up
|
||||
```
|
||||
|
||||
### Using CLI commands
|
||||
CLI commands are executed via VPP API test tool by sending API command "*exec + cli_command*". It is possible to set verbosity level for executing specific CLI commands, so that the CLI command is executed only and only if its associated verbosity level is equal or lower then the verbosity level set in the system.
|
||||
|
||||
Similarly to API commands, when executed, CLI command and its possible output are printed in the terminal if verbosity level is greater then 0.
|
||||
|
||||
Example I - CLI command will be executed always (its verbosity is 0):
|
||||
|
||||
```
|
||||
cls.cli(0, "show l2fib")
|
||||
```
|
||||
|
||||
Example II - CLI command will be executed only if the verbosity level is set to 2:
|
||||
|
||||
```
|
||||
self.cli(2, "show l2fib verbose")
|
||||
```
|
||||
|
||||
## Logging
|
||||
It is possible to log some additional information in the terminal for different verbosity levels.
|
||||
|
||||
Example I - verbosity level of the log is set to default value (0):
|
||||
|
||||
```
|
||||
self.log("Verifying capture %u" % i)
|
||||
```
|
||||
|
||||
will be always printed in the terminal:
|
||||
|
||||
```
|
||||
LOG: Verifying capture 0
|
||||
```
|
||||
|
||||
Example II - the log will be printed in the terminal only if the verbosity level is set to 2:
|
||||
|
||||
```
|
||||
self.log("Got packet on port %u: src=%u (id=%u)"
|
||||
% (o, payload_info.src, payload_info.index), 2)
|
||||
```
|
||||
|
||||
---
|
||||
***END***
|
261
test/test_ip.py
Normal file
261
test/test_ip.py
Normal file
@ -0,0 +1,261 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import logging
|
||||
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
|
||||
|
||||
import unittest
|
||||
from framework import VppTestCase, VppTestRunner
|
||||
from util import Util
|
||||
|
||||
from scapy.packet import Raw
|
||||
from scapy.layers.l2 import Ether, ARP, Dot1Q
|
||||
from scapy.layers.inet import IP, UDP
|
||||
|
||||
|
||||
class TestIPv4(Util, VppTestCase):
|
||||
""" IPv4 Test Case """
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(TestIPv4, cls).setUpClass()
|
||||
|
||||
try:
|
||||
cls.create_interfaces_and_subinterfaces()
|
||||
|
||||
# configure IPv4 on hardware interfaces
|
||||
cls.config_ip4(cls.interfaces)
|
||||
|
||||
cls.config_ip4_on_software_interfaces(cls.interfaces)
|
||||
|
||||
# resolve ARP using hardware interfaces
|
||||
cls.resolve_arp(cls.interfaces)
|
||||
|
||||
# let VPP know MAC addresses of peer (sub)interfaces
|
||||
cls.resolve_arp_on_software_interfaces(cls.interfaces)
|
||||
|
||||
# config 2M FIB enries
|
||||
cls.config_fib_entries(2000000)
|
||||
|
||||
except Exception as e:
|
||||
super(TestIPv4, cls).tearDownClass()
|
||||
raise
|
||||
|
||||
def tearDown(self):
|
||||
self.cli(2, "show int")
|
||||
self.cli(2, "show trace")
|
||||
self.cli(2, "show hardware")
|
||||
self.cli(2, "show ip arp")
|
||||
# self.cli(2, "show ip fib") # 2M entries
|
||||
self.cli(2, "show error")
|
||||
self.cli(2, "show run")
|
||||
|
||||
@classmethod
|
||||
def create_vlan_subif(cls, pg_index, vlan):
|
||||
cls.api("create_vlan_subif pg%u vlan %u" % (pg_index, vlan))
|
||||
|
||||
@classmethod
|
||||
def create_dot1ad_subif(cls, pg_index, sub_id, outer_vlan_id, inner_vlan_id):
|
||||
cls.api("create_subif pg%u sub_id %u outer_vlan_id %u inner_vlan_id %u dot1ad"
|
||||
% (pg_index, sub_id, outer_vlan_id, inner_vlan_id))
|
||||
|
||||
class SoftInt(object):
|
||||
pass
|
||||
|
||||
class HardInt(SoftInt):
|
||||
pass
|
||||
|
||||
class Subint(SoftInt):
|
||||
def __init__(self, sub_id):
|
||||
self.sub_id = sub_id
|
||||
|
||||
class Dot1QSubint(Subint):
|
||||
def __init__(self, sub_id, vlan=None):
|
||||
if vlan is None:
|
||||
vlan = sub_id
|
||||
super(TestIPv4.Dot1QSubint, self).__init__(sub_id)
|
||||
self.vlan = vlan
|
||||
|
||||
class Dot1ADSubint(Subint):
|
||||
def __init__(self, sub_id, outer_vlan, inner_vlan):
|
||||
super(TestIPv4.Dot1ADSubint, self).__init__(sub_id)
|
||||
self.outer_vlan = outer_vlan
|
||||
self.inner_vlan = inner_vlan
|
||||
|
||||
@classmethod
|
||||
def create_interfaces_and_subinterfaces(cls):
|
||||
cls.interfaces = range(3)
|
||||
|
||||
cls.create_interfaces(cls.interfaces)
|
||||
|
||||
# Make vpp_api_test see interfaces created using debug CLI (in function create_interfaces)
|
||||
cls.api("sw_interface_dump")
|
||||
|
||||
cls.INT_DETAILS = dict()
|
||||
|
||||
cls.INT_DETAILS[0] = cls.HardInt()
|
||||
|
||||
cls.INT_DETAILS[1] = cls.Dot1QSubint(100)
|
||||
cls.create_vlan_subif(1, cls.INT_DETAILS[1].vlan)
|
||||
|
||||
# FIXME: Wrong packet format/wrong layer on output of interface 2
|
||||
#self.INT_DETAILS[2] = self.Dot1ADSubint(10, 200, 300)
|
||||
#self.create_dot1ad_subif(2, self.INT_DETAILS[2].sub_id, self.INT_DETAILS[2].outer_vlan, self.INT_DETAILS[2].inner_vlan)
|
||||
|
||||
# Use dor1q for now
|
||||
cls.INT_DETAILS[2] = cls.Dot1QSubint(200)
|
||||
cls.create_vlan_subif(2, cls.INT_DETAILS[2].vlan)
|
||||
|
||||
for i in cls.interfaces:
|
||||
det = cls.INT_DETAILS[i]
|
||||
if isinstance(det, cls.Subint):
|
||||
cls.api("sw_interface_set_flags pg%u.%u admin-up" % (i, det.sub_id))
|
||||
|
||||
# IP adresses on subinterfaces
|
||||
MY_SOFT_IP4S = {}
|
||||
VPP_SOFT_IP4S = {}
|
||||
|
||||
@classmethod
|
||||
def config_ip4_on_software_interfaces(cls, args):
|
||||
for i in args:
|
||||
cls.MY_SOFT_IP4S[i] = "172.17.%u.2" % i
|
||||
cls.VPP_SOFT_IP4S[i] = "172.17.%u.1" % i
|
||||
if isinstance(cls.INT_DETAILS[i], cls.Subint):
|
||||
interface = "pg%u.%u" % (i, cls.INT_DETAILS[i].sub_id)
|
||||
else:
|
||||
interface = "pg%u" % i
|
||||
cls.api("sw_interface_add_del_address %s %s/24" % (interface, cls.VPP_SOFT_IP4S[i]))
|
||||
cls.log("My subinterface IPv4 address is %s" % (cls.MY_SOFT_IP4S[i]))
|
||||
|
||||
# let VPP know MAC addresses of peer (sub)interfaces
|
||||
@classmethod
|
||||
def resolve_arp_on_software_interfaces(cls, args):
|
||||
for i in args:
|
||||
ip = cls.VPP_SOFT_IP4S[i]
|
||||
cls.log("Sending ARP request for %s on port %u" % (ip, i))
|
||||
packet = (Ether(dst="ff:ff:ff:ff:ff:ff", src=cls.MY_MACS[i]) /
|
||||
ARP(op=ARP.who_has, pdst=ip, psrc=cls.MY_SOFT_IP4S[i], hwsrc=cls.MY_MACS[i]))
|
||||
|
||||
cls.add_dot1_layers(i, packet)
|
||||
|
||||
cls.pg_add_stream(i, packet)
|
||||
cls.pg_enable_capture([i])
|
||||
|
||||
cls.cli(2, "trace add pg-input 1")
|
||||
cls.pg_start()
|
||||
|
||||
# We don't need to read output
|
||||
|
||||
@classmethod
|
||||
def config_fib_entries(cls, count):
|
||||
n_int = len(cls.interfaces)
|
||||
for i in cls.interfaces:
|
||||
cls.api("ip_add_del_route 10.0.0.1/32 via %s count %u" % (cls.VPP_SOFT_IP4S[i], count / n_int))
|
||||
|
||||
@classmethod
|
||||
def add_dot1_layers(cls, i, packet):
|
||||
assert(type(packet) is Ether)
|
||||
payload = packet.payload
|
||||
det = cls.INT_DETAILS[i]
|
||||
if isinstance(det, cls.Dot1QSubint):
|
||||
packet.remove_payload()
|
||||
packet.add_payload(Dot1Q(vlan=det.sub_id) / payload)
|
||||
elif isinstance(det, cls.Dot1ADSubint):
|
||||
packet.remove_payload()
|
||||
packet.add_payload(Dot1Q(vlan=det.outer_vlan) / Dot1Q(vlan=det.inner_vlan) / payload)
|
||||
packet.type = 0x88A8
|
||||
|
||||
def remove_dot1_layers(self, i, packet):
|
||||
self.assertEqual(type(packet), Ether)
|
||||
payload = packet.payload
|
||||
det = self.INT_DETAILS[i]
|
||||
if isinstance(det, self.Dot1QSubint):
|
||||
self.assertEqual(type(payload), Dot1Q)
|
||||
self.assertEqual(payload.vlan, self.INT_DETAILS[i].vlan)
|
||||
payload = payload.payload
|
||||
elif isinstance(det, self.Dot1ADSubint): # TODO: change 88A8 type
|
||||
self.assertEqual(type(payload), Dot1Q)
|
||||
self.assertEqual(payload.vlan, self.INT_DETAILS[i].outer_vlan)
|
||||
payload = payload.payload
|
||||
self.assertEqual(type(payload), Dot1Q)
|
||||
self.assertEqual(payload.vlan, self.INT_DETAILS[i].inner_vlan)
|
||||
payload = payload.payload
|
||||
packet.remove_payload()
|
||||
packet.add_payload(payload)
|
||||
|
||||
def create_stream(self, pg_id):
|
||||
pg_targets = [None] * 3
|
||||
pg_targets[0] = [1, 2]
|
||||
pg_targets[1] = [0, 2]
|
||||
pg_targets[2] = [0, 1]
|
||||
pkts = []
|
||||
for i in range(0, 257):
|
||||
target_pg_id = pg_targets[pg_id][i % 2]
|
||||
info = self.create_packet_info(pg_id, target_pg_id)
|
||||
payload = self.info_to_payload(info)
|
||||
p = (Ether(dst=self.VPP_MACS[pg_id], src=self.MY_MACS[pg_id]) /
|
||||
IP(src=self.MY_SOFT_IP4S[pg_id], dst=self.MY_SOFT_IP4S[target_pg_id]) /
|
||||
UDP(sport=1234, dport=1234) /
|
||||
Raw(payload))
|
||||
info.data = p.copy()
|
||||
self.add_dot1_layers(pg_id, p)
|
||||
if not isinstance(self.INT_DETAILS[pg_id], self.Subint):
|
||||
packet_sizes = [64, 512, 1518, 9018]
|
||||
else:
|
||||
packet_sizes = [64, 512, 1518+4, 9018+4]
|
||||
size = packet_sizes[(i / 2) % len(packet_sizes)]
|
||||
self.extend_packet(p, size)
|
||||
pkts.append(p)
|
||||
return pkts
|
||||
|
||||
def verify_capture(self, o, capture):
|
||||
last_info = {}
|
||||
for i in self.interfaces:
|
||||
last_info[i] = None
|
||||
for packet in capture:
|
||||
self.remove_dot1_layers(o, packet) # Check VLAN tags and Ethernet header
|
||||
self.assertTrue(Dot1Q not in packet)
|
||||
try:
|
||||
ip = packet[IP]
|
||||
udp = packet[UDP]
|
||||
payload_info = self.payload_to_info(str(packet[Raw]))
|
||||
packet_index = payload_info.index
|
||||
src_pg = payload_info.src
|
||||
dst_pg = payload_info.dst
|
||||
self.assertEqual(dst_pg, o)
|
||||
self.log("Got packet on port %u: src=%u (id=%u)" % (o, src_pg, packet_index), 2)
|
||||
next_info = self.get_next_packet_info_for_interface2(src_pg, dst_pg, last_info[src_pg])
|
||||
last_info[src_pg] = next_info
|
||||
self.assertTrue(next_info is not None)
|
||||
self.assertEqual(packet_index, next_info.index)
|
||||
saved_packet = next_info.data
|
||||
# Check standard fields
|
||||
self.assertEqual(ip.src, saved_packet[IP].src)
|
||||
self.assertEqual(ip.dst, saved_packet[IP].dst)
|
||||
self.assertEqual(udp.sport, saved_packet[UDP].sport)
|
||||
self.assertEqual(udp.dport, saved_packet[UDP].dport)
|
||||
except:
|
||||
self.log("Unexpected or invalid packet:")
|
||||
packet.show()
|
||||
raise
|
||||
for i in self.interfaces:
|
||||
remaining_packet = self.get_next_packet_info_for_interface2(i, o, last_info[i])
|
||||
self.assertTrue(remaining_packet is None, "Port %u: Packet expected from source %u didn't arrive" % (o, i))
|
||||
|
||||
def test_fib(self):
|
||||
""" IPv4 FIB test """
|
||||
|
||||
for i in self.interfaces:
|
||||
pkts = self.create_stream(i)
|
||||
self.pg_add_stream(i, pkts)
|
||||
|
||||
self.pg_enable_capture(self.interfaces)
|
||||
self.pg_start()
|
||||
|
||||
for i in self.interfaces:
|
||||
out = self.pg_get_capture(i)
|
||||
self.log("Verifying capture %u" % i)
|
||||
self.verify_capture(i, out)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main(testRunner = VppTestRunner)
|
281
test/test_ip6.py
Normal file
281
test/test_ip6.py
Normal file
@ -0,0 +1,281 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import logging
|
||||
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
|
||||
|
||||
import unittest
|
||||
from framework import VppTestCase, VppTestRunner
|
||||
from util import Util
|
||||
|
||||
from scapy.packet import Raw
|
||||
from scapy.layers.l2 import Ether, Dot1Q
|
||||
from scapy.layers.inet6 import (IPv6, UDP,
|
||||
ICMPv6ND_NS, ICMPv6NDOptSrcLLAddr,
|
||||
ICMPv6ND_NA, ICMPv6NDOptDstLLAddr)
|
||||
|
||||
|
||||
@unittest.skip('Not finished yet.\n')
|
||||
class TestIPv6(Util, VppTestCase):
|
||||
""" IPv6 Test Case """
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(TestIPv6, cls).setUpClass()
|
||||
|
||||
try:
|
||||
cls.create_interfaces_and_subinterfaces()
|
||||
|
||||
# configure IPv6 on hardware interfaces
|
||||
cls.config_ip6(cls.interfaces)
|
||||
|
||||
cls.config_ip6_on_software_interfaces(cls.interfaces)
|
||||
|
||||
# resolve ICMPv6 ND using hardware interfaces
|
||||
cls.resolve_icmpv6_nd(cls.interfaces)
|
||||
|
||||
# let VPP know MAC addresses of peer (sub)interfaces
|
||||
# cls.resolve_icmpv6_nd_on_software_interfaces(cls.interfaces)
|
||||
cls.send_neighbour_advertisement_on_software_interfaces(cls.interfaces)
|
||||
|
||||
# config 2M FIB enries
|
||||
#cls.config_fib_entries(2000000)
|
||||
cls.config_fib_entries(1000000)
|
||||
|
||||
except Exception as e:
|
||||
super(TestIPv6, cls).tearDownClass()
|
||||
raise
|
||||
|
||||
def tearDown(self):
|
||||
self.cli(2, "show int")
|
||||
self.cli(2, "show trace")
|
||||
self.cli(2, "show hardware")
|
||||
self.cli(2, "show ip arp")
|
||||
# self.cli(2, "show ip fib") # 2M entries
|
||||
self.cli(2, "show error")
|
||||
self.cli(2, "show run")
|
||||
|
||||
@classmethod
|
||||
def create_vlan_subif(cls, pg_index, vlan):
|
||||
cls.api("create_vlan_subif pg%u vlan %u" % (pg_index, vlan))
|
||||
|
||||
@classmethod
|
||||
def create_dot1ad_subif(cls, pg_index, sub_id, outer_vlan_id, inner_vlan_id):
|
||||
cls.api("create_subif pg%u sub_id %u outer_vlan_id %u inner_vlan_id %u dot1ad"
|
||||
% (pg_index, sub_id, outer_vlan_id, inner_vlan_id))
|
||||
|
||||
class SoftInt(object):
|
||||
pass
|
||||
|
||||
class HardInt(SoftInt):
|
||||
pass
|
||||
|
||||
class Subint(SoftInt):
|
||||
def __init__(self, sub_id):
|
||||
self.sub_id = sub_id
|
||||
|
||||
class Dot1QSubint(Subint):
|
||||
def __init__(self, sub_id, vlan=None):
|
||||
if vlan is None:
|
||||
vlan = sub_id
|
||||
super(TestIPv6.Dot1QSubint, self).__init__(sub_id)
|
||||
self.vlan = vlan
|
||||
|
||||
class Dot1ADSubint(Subint):
|
||||
def __init__(self, sub_id, outer_vlan, inner_vlan):
|
||||
super(TestIPv6.Dot1ADSubint, self).__init__(sub_id)
|
||||
self.outer_vlan = outer_vlan
|
||||
self.inner_vlan = inner_vlan
|
||||
|
||||
@classmethod
|
||||
def create_interfaces_and_subinterfaces(cls):
|
||||
cls.interfaces = range(3)
|
||||
|
||||
cls.create_interfaces(cls.interfaces)
|
||||
|
||||
# Make vpp_api_test see interfaces created using debug CLI (in function create_interfaces)
|
||||
cls.api("sw_interface_dump")
|
||||
|
||||
cls.INT_DETAILS = dict()
|
||||
|
||||
cls.INT_DETAILS[0] = cls.HardInt()
|
||||
|
||||
cls.INT_DETAILS[1] = cls.Dot1QSubint(100)
|
||||
cls.create_vlan_subif(1, cls.INT_DETAILS[1].vlan)
|
||||
|
||||
# FIXME: Wrong packet format/wrong layer on output of interface 2
|
||||
#self.INT_DETAILS[2] = self.Dot1ADSubint(10, 200, 300)
|
||||
#self.create_dot1ad_subif(2, self.INT_DETAILS[2].sub_id, self.INT_DETAILS[2].outer_vlan, self.INT_DETAILS[2].inner_vlan)
|
||||
|
||||
# Use dor1q for now
|
||||
cls.INT_DETAILS[2] = cls.Dot1QSubint(200)
|
||||
cls.create_vlan_subif(2, cls.INT_DETAILS[2].vlan)
|
||||
|
||||
for i in cls.interfaces:
|
||||
det = cls.INT_DETAILS[i]
|
||||
if isinstance(det, cls.Subint):
|
||||
cls.api("sw_interface_set_flags pg%u.%u admin-up" % (i, det.sub_id))
|
||||
|
||||
# IP adresses on subinterfaces
|
||||
MY_SOFT_IP6S = {}
|
||||
VPP_SOFT_IP6S = {}
|
||||
|
||||
@classmethod
|
||||
def config_ip6_on_software_interfaces(cls, args):
|
||||
for i in args:
|
||||
cls.MY_SOFT_IP6S[i] = "fd01:%u::2" % i
|
||||
cls.VPP_SOFT_IP6S[i] = "fd01:%u::1" % i
|
||||
if isinstance(cls.INT_DETAILS[i], cls.Subint):
|
||||
interface = "pg%u.%u" % (i, cls.INT_DETAILS[i].sub_id)
|
||||
else:
|
||||
interface = "pg%u" % i
|
||||
cls.api("sw_interface_add_del_address %s %s/32" % (interface, cls.VPP_SOFT_IP6S[i]))
|
||||
cls.log("My subinterface IPv6 address is %s" % (cls.MY_SOFT_IP6S[i]))
|
||||
|
||||
# let VPP know MAC addresses of peer (sub)interfaces
|
||||
@classmethod
|
||||
def resolve_icmpv6_nd_on_software_interfaces(cls, args):
|
||||
for i in args:
|
||||
ip = cls.VPP_SOFT_IP6S[i]
|
||||
cls.log("Sending ICMPv6ND_NS request for %s on port %u" % (ip, i))
|
||||
nd_req = (Ether(dst="ff:ff:ff:ff:ff:ff", src=cls.MY_MACS[i]) /
|
||||
IPv6(src=cls.MY_SOFT_IP6S[i], dst=ip) /
|
||||
ICMPv6ND_NS(tgt=ip) /
|
||||
ICMPv6NDOptSrcLLAddr(lladdr=cls.MY_MACS[i]))
|
||||
cls.pg_add_stream(i, nd_req)
|
||||
cls.pg_enable_capture([i])
|
||||
|
||||
cls.cli(2, "trace add pg-input 1")
|
||||
cls.pg_start()
|
||||
|
||||
# We don't need to read output
|
||||
|
||||
# let VPP know MAC addresses of peer (sub)interfaces
|
||||
@classmethod
|
||||
def send_neighbour_advertisement_on_software_interfaces(cls, args):
|
||||
for i in args:
|
||||
ip = cls.VPP_SOFT_IP6S[i]
|
||||
cls.log("Sending ICMPv6ND_NA message for %s on port %u" % (ip, i))
|
||||
pkt = (Ether(dst="ff:ff:ff:ff:ff:ff", src=cls.MY_MACS[i]) /
|
||||
IPv6(src=cls.MY_SOFT_IP6S[i], dst=ip) /
|
||||
ICMPv6ND_NA(tgt=ip, R=0, S=0) /
|
||||
ICMPv6NDOptDstLLAddr(lladdr=cls.MY_MACS[i]))
|
||||
cls.pg_add_stream(i, pkt)
|
||||
cls.pg_enable_capture([i])
|
||||
|
||||
cls.cli(2, "trace add pg-input 1")
|
||||
cls.pg_start()
|
||||
|
||||
@classmethod
|
||||
def config_fib_entries(cls, count):
|
||||
n_int = len(cls.interfaces)
|
||||
for i in cls.interfaces:
|
||||
cls.api("ip_add_del_route fd02::1/128 via %s count %u" % (cls.VPP_SOFT_IP6S[i], count / n_int))
|
||||
|
||||
@classmethod
|
||||
def add_dot1_layers(cls, i, packet):
|
||||
assert(type(packet) is Ether)
|
||||
payload = packet.payload
|
||||
det = cls.INT_DETAILS[i]
|
||||
if isinstance(det, cls.Dot1QSubint):
|
||||
packet.remove_payload()
|
||||
packet.add_payload(Dot1Q(vlan=det.sub_id) / payload)
|
||||
elif isinstance(det, cls.Dot1ADSubint):
|
||||
packet.remove_payload()
|
||||
packet.add_payload(Dot1Q(vlan=det.outer_vlan) / Dot1Q(vlan=det.inner_vlan) / payload)
|
||||
packet.type = 0x88A8
|
||||
|
||||
def remove_dot1_layers(self, i, packet):
|
||||
self.assertEqual(type(packet), Ether)
|
||||
payload = packet.payload
|
||||
det = self.INT_DETAILS[i]
|
||||
if isinstance(det, self.Dot1QSubint):
|
||||
self.assertEqual(type(payload), Dot1Q)
|
||||
self.assertEqual(payload.vlan, self.INT_DETAILS[i].vlan)
|
||||
payload = payload.payload
|
||||
elif isinstance(det, self.Dot1ADSubint): # TODO: change 88A8 type
|
||||
self.assertEqual(type(payload), Dot1Q)
|
||||
self.assertEqual(payload.vlan, self.INT_DETAILS[i].outer_vlan)
|
||||
payload = payload.payload
|
||||
self.assertEqual(type(payload), Dot1Q)
|
||||
self.assertEqual(payload.vlan, self.INT_DETAILS[i].inner_vlan)
|
||||
payload = payload.payload
|
||||
packet.remove_payload()
|
||||
packet.add_payload(payload)
|
||||
|
||||
def create_stream(self, pg_id):
|
||||
pg_targets = [None] * 3
|
||||
pg_targets[0] = [1, 2]
|
||||
pg_targets[1] = [0, 2]
|
||||
pg_targets[2] = [0, 1]
|
||||
pkts = []
|
||||
for i in range(0, 257):
|
||||
target_pg_id = pg_targets[pg_id][i % 2]
|
||||
info = self.create_packet_info(pg_id, target_pg_id)
|
||||
payload = self.info_to_payload(info)
|
||||
p = (Ether(dst=self.VPP_MACS[pg_id], src=self.MY_MACS[pg_id]) /
|
||||
IPv6(src=self.MY_SOFT_IP6S[pg_id], dst=self.MY_SOFT_IP6S[target_pg_id]) /
|
||||
UDP(sport=1234, dport=1234) /
|
||||
Raw(payload))
|
||||
info.data = p.copy()
|
||||
self.add_dot1_layers(pg_id, p)
|
||||
if not isinstance(self.INT_DETAILS[pg_id], self.Subint):
|
||||
packet_sizes = [76, 512, 1518, 9018]
|
||||
else:
|
||||
packet_sizes = [76, 512, 1518+4, 9018+4]
|
||||
size = packet_sizes[(i / 2) % len(packet_sizes)]
|
||||
self.extend_packet(p, size)
|
||||
pkts.append(p)
|
||||
return pkts
|
||||
|
||||
def verify_capture(self, o, capture):
|
||||
last_info = {}
|
||||
for i in self.interfaces:
|
||||
last_info[i] = None
|
||||
for packet in capture:
|
||||
self.remove_dot1_layers(o, packet) # Check VLAN tags and Ethernet header
|
||||
self.assertTrue(Dot1Q not in packet)
|
||||
try:
|
||||
ip = packet[IPv6]
|
||||
udp = packet[UDP]
|
||||
payload_info = self.payload_to_info(str(packet[Raw]))
|
||||
packet_index = payload_info.index
|
||||
src_pg = payload_info.src
|
||||
dst_pg = payload_info.dst
|
||||
self.assertEqual(dst_pg, o)
|
||||
self.log("Got packet on port %u: src=%u (id=%u)" % (o, src_pg, packet_index), 2)
|
||||
next_info = self.get_next_packet_info_for_interface2(src_pg, dst_pg, last_info[src_pg])
|
||||
last_info[src_pg] = next_info
|
||||
self.assertTrue(next_info is not None)
|
||||
self.assertEqual(packet_index, next_info.index)
|
||||
saved_packet = next_info.data
|
||||
# Check standard fields
|
||||
self.assertEqual(ip.src, saved_packet[IPv6].src)
|
||||
self.assertEqual(ip.dst, saved_packet[IPv6].dst)
|
||||
self.assertEqual(udp.sport, saved_packet[UDP].sport)
|
||||
self.assertEqual(udp.dport, saved_packet[UDP].dport)
|
||||
except:
|
||||
self.log("Unexpected or invalid packet:")
|
||||
packet.show()
|
||||
raise
|
||||
for i in self.interfaces:
|
||||
remaining_packet = self.get_next_packet_info_for_interface2(i, o, last_info[i])
|
||||
self.assertTrue(remaining_packet is None, "Port %u: Packet expected from source %u didn't arrive" % (o, i))
|
||||
|
||||
def test_fib(self):
|
||||
""" IPv6 FIB test """
|
||||
|
||||
for i in self.interfaces:
|
||||
pkts = self.create_stream(i)
|
||||
self.pg_add_stream(i, pkts)
|
||||
|
||||
self.pg_enable_capture(self.interfaces)
|
||||
self.pg_start()
|
||||
|
||||
for i in self.interfaces:
|
||||
out = self.pg_get_capture(i)
|
||||
self.log("Verifying capture %u" % i)
|
||||
self.verify_capture(i, out)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main(testRunner = VppTestRunner)
|
473
test/test_l2bd.py
Normal file
473
test/test_l2bd.py
Normal file
File diff suppressed because it is too large
Load Diff
243
test/test_l2xc.py
Normal file
243
test/test_l2xc.py
Normal file
@ -0,0 +1,243 @@
|
||||
#!/usr/bin/env python
|
||||
## @file test_l2xc.py
|
||||
# Module to provide L2 cross-connect test case.
|
||||
#
|
||||
# The module provides a set of tools for L2 cross-connect tests.
|
||||
|
||||
import logging
|
||||
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
|
||||
|
||||
import unittest
|
||||
import random
|
||||
from framework import VppTestCase, VppTestRunner
|
||||
from scapy.layers.l2 import Ether, Raw
|
||||
from scapy.layers.inet import IP, UDP
|
||||
|
||||
|
||||
## Subclass of the VppTestCase class.
|
||||
#
|
||||
# This subclass is a class for L2 cross-connect test cases. It provides methods
|
||||
# to create interfaces, configuring L2 cross-connects, creating and verifying
|
||||
# packet streams.
|
||||
class TestL2xc(VppTestCase):
|
||||
""" L2XC Test Case """
|
||||
|
||||
# Test variables
|
||||
interf_nr = 4 # Number of interfaces
|
||||
hosts_nr = 10 # Number of hosts
|
||||
pkts_per_burst = 257 # Number of packets per burst
|
||||
|
||||
## Class method to start the test case.
|
||||
# Overrides setUpClass method in VppTestCase class.
|
||||
# There is used try..except statement to ensure that the tear down of
|
||||
# the class will be executed even if any exception is raised.
|
||||
# @param cls The class pointer.
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(TestL2xc, cls).setUpClass()
|
||||
|
||||
try:
|
||||
## Create interfaces
|
||||
cls.interfaces = range(TestL2xc.interf_nr)
|
||||
cls.create_interfaces(cls.interfaces)
|
||||
|
||||
## Create bi-directional cross-connects between pg0 and pg1
|
||||
cls.api("sw_interface_set_l2_xconnect rx pg0 tx pg1 enable")
|
||||
cls.api("sw_interface_set_l2_xconnect rx pg1 tx pg0 enable")
|
||||
|
||||
## Create bi-directional cross-connects between pg2 and pg3
|
||||
cls.api("sw_interface_set_l2_xconnect rx pg2 tx pg3 enable")
|
||||
cls.api("sw_interface_set_l2_xconnect rx pg3 tx pg2 enable")
|
||||
|
||||
cls.cli(0, "show l2patch")
|
||||
|
||||
## Create host MAC and IPv4 lists
|
||||
cls.create_host_lists(TestL2xc.hosts_nr)
|
||||
|
||||
except Exception as e:
|
||||
cls.tearDownClass()
|
||||
raise e
|
||||
|
||||
## Method to define tear down VPP actions of the test case.
|
||||
# Overrides tearDown method in VppTestCase class.
|
||||
# @param self The object pointer.
|
||||
def tearDown(self):
|
||||
self.cli(2, "show int")
|
||||
self.cli(2, "show trace")
|
||||
self.cli(2, "show hardware")
|
||||
self.cli(2, "show l2patch")
|
||||
self.cli(2, "show error")
|
||||
self.cli(2, "show run")
|
||||
|
||||
## Class method to create required number of MAC and IPv4 addresses.
|
||||
# Create required number of host MAC addresses and distribute them among
|
||||
# interfaces. Create host IPv4 address for every host MAC address too.
|
||||
# @param cls The class pointer.
|
||||
# @param count Integer variable to store the number of MAC addresses to be
|
||||
# created.
|
||||
@classmethod
|
||||
def create_host_lists(cls, count):
|
||||
for i in cls.interfaces:
|
||||
cls.MY_MACS[i] = []
|
||||
cls.MY_IP4S[i] = []
|
||||
for j in range(0, count):
|
||||
cls.MY_MACS[i].append("00:00:00:ff:%02x:%02x" % (i, j))
|
||||
cls.MY_IP4S[i].append("172.17.1%02x.%u" % (i, j))
|
||||
## @var MY_MACS
|
||||
# Dictionary variable to store list of MAC addresses per interface.
|
||||
## @var MY_IP4S
|
||||
# Dictionary variable to store list of IPv4 addresses per interface.
|
||||
|
||||
## Method to create packet stream for the packet generator interface.
|
||||
# Create input packet stream for the given packet generator interface with
|
||||
# packets of different length targeted for all other created packet
|
||||
# generator interfaces.
|
||||
# @param self The object pointer.
|
||||
# @param pg_id Integer variable to store the index of the interface to
|
||||
# create the input packet stream.
|
||||
# @return pkts List variable to store created input stream of packets.
|
||||
def create_stream(self, pg_id):
|
||||
# TODO: use variables to create lists based on interface number
|
||||
pg_targets = [None] * 4
|
||||
pg_targets[0] = [1]
|
||||
pg_targets[1] = [0]
|
||||
pg_targets[2] = [3]
|
||||
pg_targets[3] = [2]
|
||||
pkts = []
|
||||
for i in range(0, TestL2xc.pkts_per_burst):
|
||||
target_pg_id = pg_targets[pg_id][0]
|
||||
target_host_id = random.randrange(len(self.MY_MACS[target_pg_id]))
|
||||
source_host_id = random.randrange(len(self.MY_MACS[pg_id]))
|
||||
pkt_info = self.create_packet_info(pg_id, target_pg_id)
|
||||
payload = self.info_to_payload(pkt_info)
|
||||
p = (Ether(dst=self.MY_MACS[target_pg_id][target_host_id],
|
||||
src=self.MY_MACS[pg_id][source_host_id]) /
|
||||
IP(src=self.MY_IP4S[pg_id][source_host_id],
|
||||
dst=self.MY_IP4S[target_pg_id][target_host_id]) /
|
||||
UDP(sport=1234, dport=1234) /
|
||||
Raw(payload))
|
||||
pkt_info.data = p.copy()
|
||||
packet_sizes = [64, 512, 1518, 9018]
|
||||
size = packet_sizes[(i / 2) % len(packet_sizes)]
|
||||
self.extend_packet(p, size)
|
||||
pkts.append(p)
|
||||
return pkts
|
||||
## @var pg_targets
|
||||
# List variable to store list of indexes of target packet generator
|
||||
# interfaces for every source packet generator interface.
|
||||
## @var target_pg_id
|
||||
# Integer variable to store the index of the random target packet
|
||||
# generator interfaces.
|
||||
## @var target_host_id
|
||||
# Integer variable to store the index of the randomly chosen
|
||||
# destination host MAC/IPv4 address.
|
||||
## @var source_host_id
|
||||
# Integer variable to store the index of the randomly chosen source
|
||||
# host MAC/IPv4 address.
|
||||
## @var pkt_info
|
||||
# Object variable to store the information about the generated packet.
|
||||
## @var payload
|
||||
# String variable to store the payload of the packet to be generated.
|
||||
## @var p
|
||||
# Object variable to store the generated packet.
|
||||
## @var packet_sizes
|
||||
# List variable to store required packet sizes.
|
||||
## @var size
|
||||
# List variable to store required packet sizes.
|
||||
|
||||
## Method to verify packet stream received on the packet generator interface.
|
||||
# Verify packet-by-packet the output stream captured on a given packet
|
||||
# generator (pg) interface using following packet payload data - order of
|
||||
# packet in the stream, index of the source and destination pg interface,
|
||||
# src and dst host IPv4 addresses and src port and dst port values of UDP
|
||||
# layer.
|
||||
# @param self The object pointer.
|
||||
# @param o Integer variable to store the index of the interface to
|
||||
# verify the output packet stream.
|
||||
# @param capture List variable to store the captured output packet stream.
|
||||
def verify_capture(self, o, capture):
|
||||
last_info = {}
|
||||
for i in self.interfaces:
|
||||
last_info[i] = None
|
||||
for packet in capture:
|
||||
try:
|
||||
ip = packet[IP]
|
||||
udp = packet[UDP]
|
||||
payload_info = self.payload_to_info(str(packet[Raw]))
|
||||
self.assertEqual(payload_info.dst, o)
|
||||
self.log("Got packet on port %u: src=%u (id=%u)"
|
||||
% (o, payload_info.src, payload_info.index), 2)
|
||||
next_info = self.get_next_packet_info_for_interface2(
|
||||
payload_info.src, payload_info.dst,
|
||||
last_info[payload_info.src])
|
||||
last_info[payload_info.src] = next_info
|
||||
self.assertTrue(next_info is not None)
|
||||
self.assertEqual(payload_info.index, next_info.index)
|
||||
# Check standard fields
|
||||
self.assertEqual(ip.src, next_info.data[IP].src)
|
||||
self.assertEqual(ip.dst, next_info.data[IP].dst)
|
||||
self.assertEqual(udp.sport, next_info.data[UDP].sport)
|
||||
self.assertEqual(udp.dport, next_info.data[UDP].dport)
|
||||
except:
|
||||
self.log("Unexpected or invalid packet:")
|
||||
packet.show()
|
||||
raise
|
||||
for i in self.interfaces:
|
||||
remaining_packet = self.get_next_packet_info_for_interface2(
|
||||
i, o, last_info[i])
|
||||
self.assertTrue(remaining_packet is None,
|
||||
"Port %u: Packet expected from source %u didn't"
|
||||
" arrive" % (o, i))
|
||||
## @var last_info
|
||||
# Dictionary variable to store verified packets per packet generator
|
||||
# interface.
|
||||
## @var ip
|
||||
# Object variable to store the IP layer of the packet.
|
||||
## @var udp
|
||||
# Object variable to store the UDP layer of the packet.
|
||||
## @var payload_info
|
||||
# Object variable to store required information about the packet.
|
||||
## @var next_info
|
||||
# Object variable to store information about next packet.
|
||||
## @var remaining_packet
|
||||
# Object variable to store information about remaining packet.
|
||||
|
||||
## Method defining L2 cross-connect test case.
|
||||
# Contains steps of the test case.
|
||||
# @param self The object pointer.
|
||||
def test_l2xc(self):
|
||||
""" L2XC test
|
||||
|
||||
Test scenario:
|
||||
1.config
|
||||
2 pairs of 2 interfaces, l2xconnected
|
||||
|
||||
2.sending l2 eth packets between 4 interfaces
|
||||
64B, 512B, 1518B, 9018B (ether_size)
|
||||
burst of packets per interface
|
||||
"""
|
||||
|
||||
## Create incoming packet streams for packet-generator interfaces
|
||||
for i in self.interfaces:
|
||||
pkts = self.create_stream(i)
|
||||
self.pg_add_stream(i, pkts)
|
||||
|
||||
## Enable packet capturing and start packet sending
|
||||
self.pg_enable_capture(self.interfaces)
|
||||
self.pg_start()
|
||||
|
||||
## Verify outgoing packet streams per packet-generator interface
|
||||
for i in self.interfaces:
|
||||
out = self.pg_get_capture(i)
|
||||
self.log("Verifying capture %u" % i)
|
||||
self.verify_capture(i, out)
|
||||
## @var pkts
|
||||
# List variable to store created input stream of packets for the packet
|
||||
# generator interface.
|
||||
## @var out
|
||||
# List variable to store captured output stream of packets for
|
||||
# the packet generator interface.
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main(testRunner = VppTestRunner)
|
102
test/test_vxlan.py
Normal file
102
test/test_vxlan.py
Normal file
@ -0,0 +1,102 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import unittest
|
||||
from framework import VppTestCase, VppTestRunner
|
||||
from util import Util
|
||||
from template_bd import BridgeDomain
|
||||
|
||||
from scapy.layers.l2 import Ether
|
||||
from scapy.layers.inet import IP, UDP
|
||||
from scapy_handlers.vxlan import VXLAN
|
||||
|
||||
|
||||
## TestVxlan is a subclass of BridgeDomain, Util, VppTestCase classes.
|
||||
#
|
||||
# TestVxlan class defines VXLAN test cases for VXLAN encapsulation,
|
||||
# decapsulation and VXLAN tunnel termination in L2 bridge-domain.
|
||||
class TestVxlan(BridgeDomain, Util, VppTestCase):
|
||||
""" VXLAN Test Case """
|
||||
|
||||
## Method to initialize all parent classes.
|
||||
#
|
||||
# Initialize BridgeDomain objects, set documentation string for inherited
|
||||
# tests and initialize VppTestCase object which must be called after
|
||||
# doc strings are set.
|
||||
def __init__(self, *args):
|
||||
BridgeDomain.__init__(self)
|
||||
self.test_decap.__func__.__doc__ = ' VXLAN BD decapsulation '
|
||||
self.test_encap.__func__.__doc__ = ' VXLAN BD encapsulation '
|
||||
VppTestCase.__init__(self, *args)
|
||||
|
||||
## Method for VXLAN encapsulate function.
|
||||
#
|
||||
# Encapsulate the original payload frame by adding VXLAN header with its
|
||||
# UDP, IP and Ethernet fields.
|
||||
def encapsulate(self, pkt):
|
||||
return (Ether(src=self.MY_MACS[0], dst=self.VPP_MACS[0]) /
|
||||
IP(src=self.MY_IP4S[0], dst=self.VPP_IP4S[0]) /
|
||||
UDP(sport=4789, dport=4789, chksum=0) /
|
||||
VXLAN(vni=1) /
|
||||
pkt)
|
||||
|
||||
## Method for VXLAN decapsulate function.
|
||||
#
|
||||
# Decapsulate the original payload frame by removing VXLAN header with
|
||||
# its UDP, IP and Ethernet fields.
|
||||
def decapsulate(self, pkt):
|
||||
return pkt[VXLAN].payload
|
||||
|
||||
## Method for checking VXLAN encapsulation.
|
||||
#
|
||||
def check_encapsulation(self, pkt):
|
||||
# TODO: add error messages
|
||||
## Verify source MAC is VPP_MAC and destination MAC is MY_MAC resolved
|
||||
# by VPP using ARP.
|
||||
self.assertEqual(pkt[Ether].src, self.VPP_MACS[0])
|
||||
self.assertEqual(pkt[Ether].dst, self.MY_MACS[0])
|
||||
## Verify VXLAN tunnel source IP is VPP_IP and destination IP is MY_IP.
|
||||
self.assertEqual(pkt[IP].src, self.VPP_IP4S[0])
|
||||
self.assertEqual(pkt[IP].dst, self.MY_IP4S[0])
|
||||
## Verify UDP destination port is VXLAN 4789, source UDP port could be
|
||||
# arbitrary.
|
||||
self.assertEqual(pkt[UDP].dport, 4789)
|
||||
# TODO: checksum check
|
||||
## Verify VNI, based on configuration it must be 1.
|
||||
self.assertEqual(pkt[VXLAN].vni, 1)
|
||||
|
||||
## Class method to start the VXLAN test case.
|
||||
# Overrides setUpClass method in VppTestCase class.
|
||||
# Python try..except statement is used to ensure that the tear down of
|
||||
# the class will be executed even if exception is raised.
|
||||
# @param cls The class pointer.
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(TestVxlan, cls).setUpClass()
|
||||
try:
|
||||
## Create 2 pg interfaces.
|
||||
cls.create_interfaces(range(2))
|
||||
## Configure IPv4 addresses on VPP pg0.
|
||||
cls.config_ip4([0])
|
||||
## Resolve MAC address for VPP's IP address on pg0.
|
||||
cls.resolve_arp([0])
|
||||
|
||||
## Create VXLAN VTEP on VPP pg0, and put vxlan_tunnel0 and pg1
|
||||
# into BD.
|
||||
cls.api("vxlan_add_del_tunnel src %s dst %s vni 1" %
|
||||
(cls.VPP_IP4S[0], cls.MY_IP4S[0]))
|
||||
cls.api("sw_interface_set_l2_bridge vxlan_tunnel0 bd_id 1")
|
||||
cls.api("sw_interface_set_l2_bridge pg1 bd_id 1")
|
||||
except:
|
||||
## In case setUpClass fails run tear down.
|
||||
cls.tearDownClass()
|
||||
raise
|
||||
|
||||
## Method to define VPP actions before tear down of the test case.
|
||||
# Overrides tearDown method in VppTestCase class.
|
||||
# @param self The object pointer.
|
||||
def tearDown(self):
|
||||
super(TestVxlan, self).tearDown()
|
||||
self.cli(2, "show bridge-domain 1 detail")
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main(testRunner=VppTestRunner)
|
139
test/util.py
Normal file
139
test/util.py
Normal file
@ -0,0 +1,139 @@
|
||||
## @package util
|
||||
# Module with common functions that should be used by the test cases.
|
||||
#
|
||||
# The module provides a set of tools for setup the test environment
|
||||
|
||||
from scapy.layers.l2 import Ether, ARP
|
||||
from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6NDOptSrcLLAddr
|
||||
|
||||
|
||||
## Util class
|
||||
#
|
||||
# Test cases that want to use methods defined in Util class should
|
||||
# inherit this class.
|
||||
#
|
||||
# class Example(Util, VppTestCase):
|
||||
# pass
|
||||
class Util(object):
|
||||
|
||||
## Class method to send ARP Request for each VPP IPv4 address in
|
||||
# order to determine VPP interface MAC address to IPv4 bindings.
|
||||
#
|
||||
# Resolved MAC address is saved to the VPP_MACS dictionary with interface
|
||||
# index as a key. ARP Request is sent from MAC in MY_MACS dictionary with
|
||||
# interface index as a key.
|
||||
# @param cls The class pointer.
|
||||
# @param args List variable to store indices of VPP interfaces.
|
||||
@classmethod
|
||||
def resolve_arp(cls, args):
|
||||
for i in args:
|
||||
ip = cls.VPP_IP4S[i]
|
||||
cls.log("Sending ARP request for %s on port %u" % (ip, i))
|
||||
arp_req = (Ether(dst="ff:ff:ff:ff:ff:ff", src=cls.MY_MACS[i]) /
|
||||
ARP(op=ARP.who_has, pdst=ip,
|
||||
psrc=cls.MY_IP4S[i], hwsrc=cls.MY_MACS[i]))
|
||||
cls.pg_add_stream(i, arp_req)
|
||||
cls.pg_enable_capture([i])
|
||||
|
||||
cls.cli(2, "trace add pg-input 1")
|
||||
cls.pg_start()
|
||||
arp_reply = cls.pg_get_capture(i)[0]
|
||||
if arp_reply[ARP].op == ARP.is_at:
|
||||
cls.log("VPP pg%u MAC address is %s " % (i, arp_reply[ARP].hwsrc))
|
||||
cls.VPP_MACS[i] = arp_reply[ARP].hwsrc
|
||||
else:
|
||||
cls.log("No ARP received on port %u" % i)
|
||||
cls.cli(2, "show trace")
|
||||
## @var ip
|
||||
# <TODO add description>
|
||||
## @var arp_req
|
||||
# <TODO add description>
|
||||
## @var arp_reply
|
||||
# <TODO add description>
|
||||
## @var VPP_MACS
|
||||
# <TODO add description>
|
||||
|
||||
## Class method to send ND request for each VPP IPv6 address in
|
||||
# order to determine VPP MAC address to IPv6 bindings.
|
||||
#
|
||||
# Resolved MAC address is saved to the VPP_MACS dictionary with interface
|
||||
# index as a key. ND Request is sent from MAC in MY_MACS dictionary with
|
||||
# interface index as a key.
|
||||
# @param cls The class pointer.
|
||||
# @param args List variable to store indices of VPP interfaces.
|
||||
@classmethod
|
||||
def resolve_icmpv6_nd(cls, args):
|
||||
for i in args:
|
||||
ip = cls.VPP_IP6S[i]
|
||||
cls.log("Sending ICMPv6ND_NS request for %s on port %u" % (ip, i))
|
||||
nd_req = (Ether(dst="ff:ff:ff:ff:ff:ff", src=cls.MY_MACS[i]) /
|
||||
IPv6(src=cls.MY_IP6S[i], dst=ip) /
|
||||
ICMPv6ND_NS(tgt=ip) /
|
||||
ICMPv6NDOptSrcLLAddr(lladdr=cls.MY_MACS[i]))
|
||||
cls.pg_add_stream(i, nd_req)
|
||||
cls.pg_enable_capture([i])
|
||||
|
||||
cls.cli(2, "trace add pg-input 1")
|
||||
cls.pg_start()
|
||||
nd_reply = cls.pg_get_capture(i)[0]
|
||||
icmpv6_na = nd_reply['ICMPv6 Neighbor Discovery - Neighbor Advertisement']
|
||||
dst_ll_addr = icmpv6_na['ICMPv6 Neighbor Discovery Option - Destination Link-Layer Address']
|
||||
cls.VPP_MACS[i] = dst_ll_addr.lladdr
|
||||
## @var ip
|
||||
# <TODO add description>
|
||||
## @var nd_req
|
||||
# <TODO add description>
|
||||
## @var nd_reply
|
||||
# <TODO add description>
|
||||
## @var icmpv6_na
|
||||
# <TODO add description>
|
||||
## @var dst_ll_addr
|
||||
# <TODO add description>
|
||||
## @var VPP_MACS
|
||||
# <TODO add description>
|
||||
|
||||
## Class method to configure IPv4 addresses on VPP interfaces.
|
||||
#
|
||||
# Set dictionary variables MY_IP4S and VPP_IP4S to IPv4 addresses
|
||||
# calculated using interface VPP interface index as a parameter.
|
||||
# /24 IPv4 prefix is used, with VPP interface address host part set
|
||||
# to .1 and MY address set to .2.
|
||||
# Used IPv4 prefix scheme: 172.16.{VPP-interface-index}.0/24.
|
||||
# @param cls The class pointer.
|
||||
# @param args List variable to store indices of VPP interfaces.
|
||||
@classmethod
|
||||
def config_ip4(cls, args):
|
||||
for i in args:
|
||||
cls.MY_IP4S[i] = "172.16.%u.2" % i
|
||||
cls.VPP_IP4S[i] = "172.16.%u.1" % i
|
||||
cls.api("sw_interface_add_del_address pg%u %s/24" % (i, cls.VPP_IP4S[i]))
|
||||
cls.log("My IPv4 address is %s" % (cls.MY_IP4S[i]))
|
||||
## @var MY_IP4S
|
||||
# Dictionary variable to store host IPv4 addresses connected to packet
|
||||
# generator interfaces.
|
||||
## @var VPP_IP4S
|
||||
# Dictionary variable to store VPP IPv4 addresses of the packet
|
||||
# generator interfaces.
|
||||
|
||||
## Class method to configure IPv6 addresses on VPP interfaces.
|
||||
#
|
||||
# Set dictionary variables MY_IP6S and VPP_IP6S to IPv6 addresses
|
||||
# calculated using interface VPP interface index as a parameter.
|
||||
# /64 IPv6 prefix is used, with VPP interface address host part set
|
||||
# to ::1 and MY address set to ::2.
|
||||
# Used IPv6 prefix scheme: fd10:{VPP-interface-index}::0/64.
|
||||
# @param cls The class pointer.
|
||||
# @param args List variable to store indices of VPP interfaces.
|
||||
@classmethod
|
||||
def config_ip6(cls, args):
|
||||
for i in args:
|
||||
cls.MY_IP6S[i] = "fd10:%u::2" % i
|
||||
cls.VPP_IP6S[i] = "fd10:%u::1" % i
|
||||
cls.api("sw_interface_add_del_address pg%u %s/64" % (i, cls.VPP_IP6S[i]))
|
||||
cls.log("My IPv6 address is %s" % (cls.MY_IP6S[i]))
|
||||
## @var MY_IP6S
|
||||
# Dictionary variable to store host IPv6 addresses connected to packet
|
||||
# generator interfaces.
|
||||
## @var VPP_IP6S
|
||||
# Dictionary variable to store VPP IPv6 addresses of the packet
|
||||
# generator interfaces.
|
Reference in New Issue
Block a user