test: factor out "L4_Conn" into a class within util.py (VPP-931)

It seems a useful abstraction for the purposes of writing
fine-grained tests, to be able to create a "connection" object
which would be bound to two VPP interfaces, and hold some
information about the state, allowing to send the packets
back and forth with minimal amount of arguments.

Change-Id: Idb83b6b82b38bded5b7e1756a41bb2df4cd58e3a
Signed-off-by: Andrew Yourtchenko <ayourtch@gmail.com>
This commit is contained in:
Andrew Yourtchenko
2017-09-07 13:22:24 +02:00
committed by Damjan Marion
parent 7d6412e66d
commit 92dc12a01b
2 changed files with 68 additions and 50 deletions

View File

@ -13,6 +13,7 @@ from scapy.layers.inet6 import ICMPv6EchoReply, IPv6ExtHdrRouting
from scapy.layers.inet6 import IPv6ExtHdrFragment
from pprint import pprint
from random import randint
from util import L4_Conn
def to_acl_rule(self, is_permit, wildcard_sport=False):
@ -68,37 +69,7 @@ class IterateWithSleep():
self.testcase.sleep(self.sleep_sec)
class Conn():
def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
self.testcase = testcase
self.ifs = [None, None]
self.ifs[0] = if1
self.ifs[1] = if2
self.address_family = af
self.l4proto = l4proto
self.ports = [None, None]
self.ports[0] = port1
self.ports[1] = port2
self
def pkt(self, side, flags=None):
is_ip6 = 1 if self.address_family == AF_INET6 else 0
s0 = side
s1 = 1-side
src_if = self.ifs[s0]
dst_if = self.ifs[s1]
layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
payload = "x"
l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
if flags is not None:
l4args['flags'] = flags
p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
layer_3[is_ip6] /
self.l4proto(**l4args) /
Raw(payload))
return p
class Conn(L4_Conn):
def apply_acls(self, reflect_side, acl_side):
pkts = []
pkts.append(self.pkt(0))
@ -152,25 +123,6 @@ class Conn():
}
return new_rule
def send(self, side, flags=None):
self.ifs[side].add_stream(self.pkt(side, flags))
self.ifs[1-side].enable_capture()
self.testcase.pg_start()
def recv(self, side):
p = self.ifs[side].wait_for_packet(1)
return p
def send_through(self, side, flags=None):
self.send(side, flags)
p = self.recv(1-side)
return p
def send_pingpong(self, side, flags1=None, flags2=None):
p1 = self.send_through(side, flags1)
p2 = self.send_through(1-side, flags2)
return [p1, p2]
@unittest.skipUnless(running_extended_tests(), "part of extended tests")
class ACLPluginConnTestCase(VppTestCase):

View File

@ -6,6 +6,13 @@ from abc import abstractmethod, ABCMeta
from cStringIO import StringIO
from scapy.layers.inet6 import in6_mactoifaceid
from scapy.layers.l2 import Ether
from scapy.packet import Raw
from scapy.layers.inet import IP, UDP, TCP
from scapy.layers.inet6 import IPv6, ICMPv6Unknown, ICMPv6EchoRequest
from scapy.packet import Packet
from socket import inet_pton, AF_INET, AF_INET6
def ppp(headline, packet):
""" Return string containing the output of scapy packet.show() call. """
@ -163,3 +170,62 @@ class ForeignAddressFactory(object):
raise Exception("Network host address exhaustion")
self.count += 1
return self.net_template.format(self.count)
class L4_Conn():
""" L4 'connection' tied to two VPP interfaces """
def __init__(self, testcase, if1, if2, af, l4proto, port1, port2):
self.testcase = testcase
self.ifs = [None, None]
self.ifs[0] = if1
self.ifs[1] = if2
self.address_family = af
self.l4proto = l4proto
self.ports = [None, None]
self.ports[0] = port1
self.ports[1] = port2
self
def pkt(self, side, l4args={}, payload="x"):
is_ip6 = 1 if self.address_family == AF_INET6 else 0
s0 = side
s1 = 1-side
src_if = self.ifs[s0]
dst_if = self.ifs[s1]
layer_3 = [IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)]
merged_l4args = {'sport': self.ports[s0], 'dport': self.ports[s1]}
merged_l4args.update(l4args)
p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
layer_3[is_ip6] /
self.l4proto(**merged_l4args) /
Raw(payload))
return p
def send(self, side, flags=None, payload=""):
l4args = {}
if flags is not None:
l4args['flags'] = flags
self.ifs[side].add_stream(self.pkt(side,
l4args=l4args, payload=payload))
self.ifs[1-side].enable_capture()
self.testcase.pg_start()
def recv(self, side):
p = self.ifs[side].wait_for_packet(1)
return p
def send_through(self, side, flags=None, payload=""):
self.send(side, flags, payload)
p = self.recv(1-side)
return p
def send_pingpong(self, side, flags1=None, flags2=None):
p1 = self.send_through(side, flags1)
p2 = self.send_through(1-side, flags2)
return [p1, p2]
class L4_CONN_SIDE:
L4_CONN_SIDE_ZERO = 0
L4_CONN_SIDE_ONE = 1