make test: Use VXLAN built in scapy 2.3.3
- fix documentation issues. - fix mpls test. Change-Id: Ieef6b4b5e4aca99e89bd03e45a991be89d42adba Signed-off-by: Matej Klotton <mklotton@cisco.com>
This commit is contained in:
@ -1,17 +0,0 @@
|
||||
from scapy.fields import BitField, XByteField, X3BytesField
|
||||
from scapy.packet import Packet, bind_layers
|
||||
from scapy.layers.l2 import Ether
|
||||
from scapy.layers.inet import UDP
|
||||
|
||||
|
||||
class VXLAN(Packet):
|
||||
name = "VXLAN"
|
||||
fields_desc = [BitField("flags", 0x08000000, 32),
|
||||
X3BytesField("vni", 0),
|
||||
XByteField("reserved", 0x00)]
|
||||
|
||||
def mysummary(self):
|
||||
return self.sprintf("VXLAN (vni=%VXLAN.vni%)")
|
||||
|
||||
bind_layers(UDP, VXLAN, dport=4789)
|
||||
bind_layers(VXLAN, Ether)
|
@ -366,16 +366,17 @@ class TestL2bdMultiInst(VppTestCase):
|
||||
|
||||
def run_verify_test(self):
|
||||
"""
|
||||
Create packet streams for all configured l2-pg interfaces, send all
|
||||
Create packet streams for all configured l2-pg interfaces, send all \
|
||||
prepared packet streams and verify that:
|
||||
- all packets received correctly on all pg-l2 interfaces assigned \
|
||||
to bridge domains
|
||||
- no packet received on all pg-l2 interfaces not assigned to \
|
||||
bridge domains
|
||||
- all packets received correctly on all pg-l2 interfaces assigned
|
||||
to bridge domains
|
||||
- no packet received on all pg-l2 interfaces not assigned to
|
||||
bridge domains
|
||||
|
||||
:raise: RuntimeError if no packet captured on l2-pg interface assigned \
|
||||
to the bridge domain or if any packet is captured on l2-pg interface \
|
||||
not assigned to the bridge domain.
|
||||
:raise RuntimeError: if no packet captured on l2-pg interface assigned
|
||||
to the bridge domain or if any packet is captured
|
||||
on l2-pg interface not assigned to the bridge
|
||||
domain.
|
||||
"""
|
||||
# Test
|
||||
# Create incoming packet streams for packet-generator interfaces
|
||||
|
@ -266,16 +266,17 @@ class TestL2xcMultiInst(VppTestCase):
|
||||
|
||||
def run_verify_test(self):
|
||||
"""
|
||||
Create packet streams for all configured l2-pg interfaces, send all
|
||||
Create packet streams for all configured l2-pg interfaces, send all \
|
||||
prepared packet streams and verify that:
|
||||
- all packets received correctly on all pg-l2 interfaces assigned \
|
||||
to cross-connects
|
||||
- no packet received on all pg-l2 interfaces not assigned to \
|
||||
cross-connects
|
||||
- all packets received correctly on all pg-l2 interfaces assigned
|
||||
to cross-connects
|
||||
- no packet received on all pg-l2 interfaces not assigned to
|
||||
cross-connects
|
||||
|
||||
:raise: RuntimeError if no packet captured on l2-pg interface assigned \
|
||||
to the cross-connect or if any packet is captured on l2-pg interface \
|
||||
not assigned to the cross-connect.
|
||||
:raise RuntimeError: if no packet captured on l2-pg interface assigned
|
||||
to the cross-connect or if any packet is captured
|
||||
on l2-pg interface not assigned to the
|
||||
cross-connect.
|
||||
"""
|
||||
# Test
|
||||
# Create incoming packet streams for packet-generator interfaces
|
||||
|
@ -4,7 +4,6 @@ import unittest
|
||||
import socket
|
||||
|
||||
from framework import VppTestCase, VppTestRunner
|
||||
from vpp_sub_interface import VppSubInterface, VppDot1QSubint, VppDot1ADSubint
|
||||
from vpp_ip_route import IpRoute, RoutePath, MplsRoute, MplsIpBind
|
||||
|
||||
from scapy.packet import Raw
|
||||
@ -15,7 +14,6 @@ from scapy.contrib.mpls import MPLS
|
||||
from util import ppp
|
||||
|
||||
|
||||
|
||||
class TestMPLS(VppTestCase):
|
||||
""" MPLS Test Case """
|
||||
|
||||
@ -97,11 +95,12 @@ class TestMPLS(VppTestCase):
|
||||
pkts.append(p)
|
||||
return pkts
|
||||
|
||||
def verify_filter(self, capture, sent):
|
||||
@staticmethod
|
||||
def verify_filter(capture, sent):
|
||||
if not len(capture) == len(sent):
|
||||
# filter out any IPv6 RAs from the captur
|
||||
# filter out any IPv6 RAs from the capture
|
||||
for p in capture:
|
||||
if (p.haslayer(IPv6)):
|
||||
if p.haslayer(IPv6):
|
||||
capture.remove(p)
|
||||
return capture
|
||||
|
||||
@ -327,8 +326,8 @@ class TestMPLS(VppTestCase):
|
||||
try:
|
||||
self.assertEqual(0, len(rx))
|
||||
except:
|
||||
error("MPLS non-EOS packets popped and forwarded")
|
||||
error(packet.show())
|
||||
self.logger.error("MPLS non-EOS packets popped and forwarded")
|
||||
self.logger.error(ppp("", rx))
|
||||
raise
|
||||
|
||||
#
|
||||
@ -352,7 +351,8 @@ class TestMPLS(VppTestCase):
|
||||
self.verify_capture_labelled_ip4(self.pg0, rx, tx, [33, 44, 45])
|
||||
|
||||
#
|
||||
# A recursive non-EOS x-connect, which resolves through another x-connect
|
||||
# A recursive non-EOS x-connect, which resolves through another
|
||||
# x-connect
|
||||
#
|
||||
route_34_neos = MplsRoute(self, 34, 0,
|
||||
[RoutePath("0.0.0.0",
|
||||
@ -369,11 +369,12 @@ class TestMPLS(VppTestCase):
|
||||
self.pg_start()
|
||||
|
||||
rx = self.pg0.get_capture()
|
||||
# it's the 2nd (counting from 0) lael in the stack that is swapped
|
||||
# it's the 2nd (counting from 0) label in the stack that is swapped
|
||||
self.verify_capture_labelled(self.pg0, rx, tx, [33, 44, 46, 99], num=2)
|
||||
|
||||
#
|
||||
# an recursive IP route that resolves through the recursive non-eos x-connect
|
||||
# an recursive IP route that resolves through the recursive non-eos
|
||||
# x-connect
|
||||
#
|
||||
ip_10_0_0_1 = IpRoute(self, "10.0.0.1", 32,
|
||||
[RoutePath("0.0.0.0",
|
||||
@ -505,7 +506,7 @@ class TestMPLS(VppTestCase):
|
||||
self.verify_capture_labelled_ip4(self.pg0, rx, tx, [32, 33, 34])
|
||||
|
||||
#
|
||||
# add a recursive path, with ouput label, via the 1 label route
|
||||
# add a recursive path, with output label, via the 1 label route
|
||||
#
|
||||
route_11_0_0_1 = IpRoute(self, "11.0.0.1", 32,
|
||||
[RoutePath("10.0.0.1",
|
||||
@ -567,14 +568,16 @@ class TestMPLS(VppTestCase):
|
||||
#
|
||||
nh_addr = socket.inet_pton(socket.AF_INET, self.pg0.remote_ip4)
|
||||
|
||||
reply = self.vapi.mpls_tunnel_add_del(0xffffffff, # don't know the if index yet
|
||||
1, # IPv4 next-hop
|
||||
nh_addr,
|
||||
self.pg0.sw_if_index,
|
||||
0, # next-hop-table-id
|
||||
1, # next-hop-weight
|
||||
2, # num-out-labels,
|
||||
[44, 46])
|
||||
reply = self.vapi.mpls_tunnel_add_del(
|
||||
0xffffffff, # don't know the if index yet
|
||||
1, # IPv4 next-hop
|
||||
nh_addr,
|
||||
self.pg0.sw_if_index,
|
||||
0, # next-hop-table-id
|
||||
1, # next-hop-weight
|
||||
2, # num-out-labels,
|
||||
[44, 46]
|
||||
)
|
||||
self.vapi.sw_interface_set_flags(reply.sw_if_index, admin_up_down=1)
|
||||
|
||||
#
|
||||
@ -584,15 +587,17 @@ class TestMPLS(VppTestCase):
|
||||
nh_addr = socket.inet_pton(socket.AF_INET, "0.0.0.0")
|
||||
dest_addr_len = 32
|
||||
|
||||
self.vapi.ip_add_del_route(dest_addr,
|
||||
dest_addr_len,
|
||||
nh_addr, # all zeros next-hop - tunnel is p2p
|
||||
reply.sw_if_index, # sw_if_index of the new tunnel
|
||||
0, # table-id
|
||||
0, # next-hop-table-id
|
||||
1, # next-hop-weight
|
||||
0, # num-out-labels,
|
||||
[]) # out-label
|
||||
self.vapi.ip_add_del_route(
|
||||
dest_addr,
|
||||
dest_addr_len,
|
||||
nh_addr, # all zeros next-hop - tunnel is p2p
|
||||
reply.sw_if_index, # sw_if_index of the new tunnel
|
||||
0, # table-id
|
||||
0, # next-hop-table-id
|
||||
1, # next-hop-weight
|
||||
0, # num-out-labels,
|
||||
[] # out-label
|
||||
)
|
||||
|
||||
self.vapi.cli("clear trace")
|
||||
tx = self.create_stream_ip4(self.pg0, "10.0.0.3")
|
||||
|
@ -6,7 +6,7 @@ from template_bd import BridgeDomain
|
||||
|
||||
from scapy.layers.l2 import Ether
|
||||
from scapy.layers.inet import IP, UDP
|
||||
from scapy_handlers.vxlan import VXLAN
|
||||
from scapy.layers.vxlan import VXLAN
|
||||
|
||||
|
||||
class TestVxlan(BridgeDomain, VppTestCase):
|
||||
@ -24,13 +24,15 @@ class TestVxlan(BridgeDomain, VppTestCase):
|
||||
return (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
|
||||
IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
|
||||
UDP(sport=self.dport, dport=self.dport, chksum=0) /
|
||||
VXLAN(vni=self.vni) /
|
||||
VXLAN(vni=self.vni, flags=self.flags) /
|
||||
pkt)
|
||||
|
||||
def decapsulate(self, pkt):
|
||||
"""
|
||||
Decapsulate the original payload frame by removing VXLAN header
|
||||
"""
|
||||
# check if is set I flag
|
||||
self.assertEqual(pkt[VXLAN].flags, int('0x8', 16))
|
||||
return pkt[VXLAN].payload
|
||||
|
||||
# Method for checking VXLAN encapsulation.
|
||||
@ -62,6 +64,7 @@ class TestVxlan(BridgeDomain, VppTestCase):
|
||||
|
||||
try:
|
||||
cls.dport = 4789
|
||||
cls.flags = 0x8
|
||||
cls.vni = 1
|
||||
|
||||
# Create 2 pg interfaces.
|
||||
@ -95,5 +98,6 @@ class TestVxlan(BridgeDomain, VppTestCase):
|
||||
if not self.vpp_dead:
|
||||
self.logger.info(self.vapi.cli("show bridge-domain 1 detail"))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main(testRunner=VppTestRunner)
|
||||
|
@ -581,8 +581,7 @@ class VppPapiProvider(object):
|
||||
|
||||
:param sw_if_index_from:
|
||||
:param sw_if_index_to:
|
||||
:param enable
|
||||
|
||||
:param state:
|
||||
"""
|
||||
return self.api(self.papi.sw_interface_span_enable_disable,
|
||||
{'sw_if_index_from': sw_if_index_from,
|
||||
|
Reference in New Issue
Block a user