VPP-1508: Use scapy.compat to manage packet level library differences.
Change-Id: Icdf6abc9e53d33b26fd1d531c7dda6be0bb9cb55 Signed-off-by: Paul Vinciguerra <pvinci@vinciconsulting.com>
This commit is contained in:
committed by
Ole Trøan
parent
0f6602cb24
commit
a7427ec6f8
@@ -3,12 +3,14 @@ from __future__ import print_function
|
||||
"""ACL plugin - MACIP tests
|
||||
"""
|
||||
import binascii
|
||||
import ipaddress
|
||||
import random
|
||||
from socket import inet_ntop, inet_pton, AF_INET, AF_INET6
|
||||
from struct import pack, unpack
|
||||
import re
|
||||
import unittest
|
||||
|
||||
import scapy.compat
|
||||
from scapy.packet import Raw
|
||||
from scapy.layers.l2 import Ether
|
||||
from scapy.layers.inet import IP, UDP
|
||||
@@ -194,8 +196,10 @@ class MethodHolder(VppTestCase):
|
||||
return acls
|
||||
|
||||
def create_rules(self, mac_type=EXACT_MAC, ip_type=EXACT_IP,
|
||||
acl_count=1, rules_count=[1]):
|
||||
acl_count=1, rules_count=None):
|
||||
acls = []
|
||||
if rules_count is None:
|
||||
rules_count = [1]
|
||||
src_mac = int("220000dead00", 16)
|
||||
for acl in range(2, (acl_count+1) * 2):
|
||||
rules = []
|
||||
@@ -248,7 +252,7 @@ class MethodHolder(VppTestCase):
|
||||
ip4[3] = 0
|
||||
ip6[8] = random.randint(100, 200)
|
||||
ip6[15] = 0
|
||||
ip_pack = ''
|
||||
ip_pack = b''
|
||||
for j in range(0, len(ip)):
|
||||
ip_pack += pack('<B', int(ip[j]))
|
||||
|
||||
@@ -256,8 +260,9 @@ class MethodHolder(VppTestCase):
|
||||
'is_ipv6': is_ip6,
|
||||
'src_ip_addr': ip_pack,
|
||||
'src_ip_prefix_len': ip_len,
|
||||
'src_mac': mac.replace(':', '').decode('hex'),
|
||||
'src_mac_mask': mask.replace(':', '').decode('hex')})
|
||||
'src_mac': binascii.unhexlify(mac.replace(':', '')),
|
||||
'src_mac_mask': binascii.unhexlify(
|
||||
mask.replace(':', ''))})
|
||||
rules.append(rule)
|
||||
if ip_type == self.WILD_IP:
|
||||
break
|
||||
@@ -275,7 +280,7 @@ class MethodHolder(VppTestCase):
|
||||
def verify_macip_acls(self, acl_count, rules_count, expected_count=2):
|
||||
reply = self.macip_acl_dump_debug()
|
||||
for acl in range(2, (acl_count+1) * 2):
|
||||
self.assertEqual(reply[acl - 2].count, rules_count[acl/2-1])
|
||||
self.assertEqual(reply[acl - 2].count, rules_count[acl//2-1])
|
||||
|
||||
self.vapi.macip_acl_interface_get()
|
||||
|
||||
@@ -410,30 +415,32 @@ class MethodHolder(VppTestCase):
|
||||
sub_ip[15] = random.randint(200, 255)
|
||||
elif ip_type == self.SUBNET_IP:
|
||||
if denyIP:
|
||||
sub_ip[2] = str(int(sub_ip[2]) + 1)
|
||||
sub_ip[2] = int(sub_ip[2]) + 1
|
||||
sub_ip[14] = random.randint(100, 199)
|
||||
sub_ip[15] = random.randint(200, 255)
|
||||
src_ip6 = inet_ntop(AF_INET6, str(bytearray(sub_ip)))
|
||||
packed_src_ip6 = b''.join(
|
||||
[scapy.compat.chb(x) for x in sub_ip])
|
||||
src_ip6 = inet_ntop(AF_INET6, packed_src_ip6)
|
||||
packet /= IPv6(src=src_ip6, dst=dst_ip6)
|
||||
else:
|
||||
if ip_type != self.EXACT_IP:
|
||||
sub_ip = ip_rule.split('.')
|
||||
if ip_type == self.WILD_IP:
|
||||
sub_ip[0] = str(random.randint(1, 49))
|
||||
sub_ip[1] = str(random.randint(50, 99))
|
||||
sub_ip[2] = str(random.randint(100, 199))
|
||||
sub_ip[3] = str(random.randint(200, 255))
|
||||
sub_ip[0] = random.randint(1, 49)
|
||||
sub_ip[1] = random.randint(50, 99)
|
||||
sub_ip[2] = random.randint(100, 199)
|
||||
sub_ip[3] = random.randint(200, 255)
|
||||
elif ip_type == self.SUBNET_IP:
|
||||
if denyIP:
|
||||
sub_ip[1] = str(int(sub_ip[1])+1)
|
||||
sub_ip[2] = str(random.randint(100, 199))
|
||||
sub_ip[3] = str(random.randint(200, 255))
|
||||
src_ip4 = ".".join(sub_ip)
|
||||
sub_ip[1] = int(sub_ip[1])+1
|
||||
sub_ip[2] = random.randint(100, 199)
|
||||
sub_ip[3] = random.randint(200, 255)
|
||||
src_ip4 = '.'.join(['{!s}'.format(x) for x in sub_ip])
|
||||
packet /= IP(src=src_ip4, dst=dst_ip4, frag=0, flags=0)
|
||||
|
||||
packet /= UDP(sport=src_port, dport=dst_port)/Raw(payload)
|
||||
|
||||
packet[Raw].load += " mac:"+src_mac
|
||||
packet[Raw].load += b" mac:%s" % scapy.compat.raw(src_mac)
|
||||
|
||||
size = self.pg_if_packet_sizes[p % len(self.pg_if_packet_sizes)]
|
||||
if isinstance(src_if, VppSubInterface):
|
||||
@@ -485,7 +492,9 @@ class MethodHolder(VppTestCase):
|
||||
sub_ip = list(unpack('<16B', inet_pton(AF_INET6, ip)))
|
||||
for i in range(8, 16):
|
||||
sub_ip[i] = 0
|
||||
ip = inet_ntop(AF_INET6, str(bytearray(sub_ip)))
|
||||
packed_ip = b''.join(
|
||||
[scapy.compat.chb(x) for x in sub_ip])
|
||||
ip = inet_ntop(AF_INET6, packed_ip)
|
||||
else:
|
||||
if ip_type == self.WILD_IP:
|
||||
ip = "0.0.0.0"
|
||||
@@ -540,8 +549,9 @@ class MethodHolder(VppTestCase):
|
||||
'is_ipv6': is_ip6,
|
||||
'src_ip_addr': ip_rule,
|
||||
'src_ip_prefix_len': prefix_len,
|
||||
'src_mac': mac_rule.replace(':', '').decode('hex'),
|
||||
'src_mac_mask': mac_mask.replace(':', '').decode('hex')})
|
||||
'src_mac': binascii.unhexlify(mac_rule.replace(':', '')),
|
||||
'src_mac_mask': binascii.unhexlify(
|
||||
mac_mask.replace(':', ''))})
|
||||
macip_rules.append(macip_rule)
|
||||
|
||||
# deny all other packets
|
||||
|
||||
Reference in New Issue
Block a user