tests: update scapy to version 2.4.5

- Required for Ubuntu 24.04 LTS jobs
- temporarily disable TestIpsecEsp1 and
  TestIpsecAhAll tests until a patch can
  be added to fix them

Type: test

Change-Id: I1ae7b170117182c3252629bbbb770775e2c496c9
Signed-off-by: Benoît Ganne <bganne@cisco.com>
Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
This commit is contained in:
Dave Wallace 2024-07-23 01:28:19 -04:00 committed by Beno�t Ganne
parent 0a3b0b231d
commit cf9356d642
30 changed files with 610 additions and 306 deletions

View File

@ -90,16 +90,20 @@ DEB_DEPENDS += jq # for extracting test summary from .json report (hs-test)
LIBFFI=libffi6 # works on all but 20.04 and debian-testing
ifeq ($(OS_VERSION_ID),24.04)
DEB_DEPENDS += libssl-dev
DEB_DEPENDS += llvm clang clang-format-14
DEB_DEPENDS += libssl-dev
DEB_DEPENDS += llvm clang clang-format-15
# overwrite clang-format version to run `make checkstyle` successfully
export CLANG_FORMAT_VER=14
LIBFFI=libffi8
DEB_DEPENDS += enchant-2 # for docs
# TODO: remove once ubuntu 20.04 is deprecated and extras/scripts/checkstyle.sh is upgraded to 15
export CLANG_FORMAT_VER=15
LIBFFI=libffi8
DEB_DEPENDS += enchant-2 # for docs
else ifeq ($(OS_VERSION_ID),22.04)
DEB_DEPENDS += python3-virtualenv
DEB_DEPENDS += libssl-dev
DEB_DEPENDS += clang clang-format-11
DEB_DEPENDS += clang clang-format-15
# overwrite clang-format version to run `make checkstyle` successfully
# TODO: remove once ubuntu 20.04 is deprecated and extras/scripts/checkstyle.sh is upgraded to 15
export CLANG_FORMAT_VER=15
LIBFFI=libffi7
DEB_DEPENDS += enchant-2 # for docs
else ifeq ($(OS_VERSION_ID),20.04)
@ -108,9 +112,6 @@ else ifeq ($(OS_VERSION_ID),20.04)
DEB_DEPENDS += clang clang-format-11
LIBFFI=libffi7
DEB_DEPENDS += enchant-2 # for docs
else ifeq ($(OS_VERSION_ID),20.10)
DEB_DEPENDS += clang clang-format-11
LIBFFI=libffi8ubuntu1
else ifeq ($(OS_ID)-$(OS_VERSION_ID),debian-10)
DEB_DEPENDS += virtualenv
else ifeq ($(OS_ID)-$(OS_VERSION_ID),debian-11)
@ -119,9 +120,10 @@ else ifeq ($(OS_ID)-$(OS_VERSION_ID),debian-11)
LIBFFI=libffi7
else ifeq ($(OS_ID)-$(OS_VERSION_ID),debian-12)
DEB_DEPENDS += virtualenv
DEB_DEPENDS += clang-14 clang-format-14
DEB_DEPENDS += clang-14 clang-format-15
# for extras/scripts/checkstyle.sh
export CLANG_FORMAT_VER=14
# TODO: remove once ubuntu 20.04 is deprecated and extras/scripts/checkstyle.sh is upgraded to -15
export CLANG_FORMAT_VER=15
LIBFFI=libffi8
else
DEB_DEPENDS += clang-11 clang-format-11

View File

@ -19,7 +19,7 @@ CLANG_FORMAT_VER_REGEX='([0-9]+)\.[0-9]+\.[0-9]+'
CLANG_FORMAT_DIFF="/usr/share/clang/clang-format-diff.py"
# TODO: Remove clang-format-${CLANG_FORMAT_VER} from 'make install-deps' when
# CLANG_FORMAT_VER default value is upgraded
# CLANG_FORMAT_VER default value is upgraded to 15 after Ubuntu-20.04 is deprecated
CLANG_FORMAT_VER=${CLANG_FORMAT_VER:-11}
GIT_DIFF_ARGS="-U0 --no-color --relative HEAD~1"
GIT_DIFF_EXCLUDE_LIST=(
@ -30,16 +30,20 @@ CLANG_FORMAT_DIFF_ARGS="-style file -p1"
SUFFIX="-${CLANG_FORMAT_VER}"
# Attempt to find clang-format to confirm Clang version.
if command -v clang-format${SUFFIX} &> /dev/null;
if command -v "clang-format${SUFFIX}" &> /dev/null;
then
CLANG_FORMAT=clang-format${SUFFIX}
elif command -v clang-format &> /dev/null;
then
CLANG_FORMAT=clang-format
CLANG_FORMAT="clang-format${SUFFIX}"
else
echo "*******************************************************************"
echo "* CHECKSTYLE FAILED"
echo "* Could not locate the clang-format${SUFFIX} script"
echo "* Run 'make install-deps' to install it"
echo "*******************************************************************"
exit 1
fi
CLANG_FORMAT_VERSION=$(${CLANG_FORMAT} --version)
echo $CLANG_FORMAT_VERSION
echo "$CLANG_FORMAT_VERSION"
# Confirm that Clang is the expected version.
if [[ ! $CLANG_FORMAT_VERSION =~ $CLANG_FORMAT_VER_REGEX ]];
@ -75,6 +79,7 @@ then
echo "*******************************************************************"
echo "* CHECKSTYLE FAILED"
echo "* Could not locate the clang-format-diff script"
echo "* Run 'make install-deps' to install it"
echo "*******************************************************************"
exit 1
fi

View File

@ -74,12 +74,13 @@ V=0
endif
PYTHON_VERSION=$(shell $(PYTHON_INTERP) -c 'import sys; print(sys.version_info.major)')
PIP_VERSION=24.0
PIP_VERSION=24.2
# Keep in sync with requirements.txt
PIP_TOOLS_VERSION=7.4.1
PIP_SETUPTOOLS_VERSION=69.2.0
PIP_SETUPTOOLS_VERSION=71.1.0
PYTHON_DEPENDS=requirements-$(PYTHON_VERSION).txt
SCAPY_SOURCE=$(shell find $(VENV_PATH)/lib/python* -name site-packages)
SCAPY_VERSION=$(shell grep scapy $(TEST_DIR)/requirements.txt | cut -d'=' -f3 | cut -d';' -f1)
BUILD_COV_DIR=$(BR)/test-coverage
PIP_TOOLS_INSTALL_DONE=$(VENV_RUN_DIR)/pip-tools-install-$(PYTHON_VERSION)-$(PIP_TOOLS_VERSION).done
@ -118,7 +119,7 @@ $(PIP_INSTALL_DONE): $(PIP_TOOLS_INSTALL_DONE) $(PYTHON_DEPENDS)
$(PIP_PATCH_DONE): $(PIP_INSTALL_DONE)
@echo --- patching ---
@sleep 1 # Ensure python recompiles patched *.py files -> *.pyc
for f in $(CURDIR)/patches/scapy-2.4.3/*.patch ; do \
for f in $(CURDIR)/patches/scapy-$(SCAPY_VERSION)/*.patch ; do \
echo Applying patch: $$(basename $$f) ; \
patch --forward -p1 -d $(SCAPY_SOURCE) < $$f ; \
retCode=$$?; \

View File

@ -71,6 +71,9 @@ class _PacketInfo(object):
#: Store the copy of the former packet.
data = None
def __repr__(self):
return f"_PacketInfo index:{self.index} src:{self.src} dst:{self.dst} ip:{self.ip} proto:{self.proto} data:{self.data}"
def __eq__(self, other):
index = self.index == other.index
src = self.src == other.src

View File

@ -0,0 +1,14 @@
diff --git a/scapy/contrib/cdp.py b/scapy/contrib/cdp.py
index a1532b78..83963ff4 100644
--- a/scapy/contrib/cdp.py
+++ b/scapy/contrib/cdp.py
@@ -392,7 +392,7 @@ class _CDPChecksum:
This padding is only used for checksum computation. The original
packet should not be altered."""
if len(pkt) % 2:
- last_chr = orb(pkt[-1])
+ last_chr = orb(pkt[len(pkt)-1:])
if last_chr <= 0x80:
return pkt[:-1] + b'\x00' + chb(last_chr)
else:

View File

@ -0,0 +1,22 @@
diff --git a/scapy/contrib/ikev2.py b/scapy/contrib/ikev2.py
index 7799fd1e..f81af7ac 100644
--- a/scapy/contrib/ikev2.py
+++ b/scapy/contrib/ikev2.py
@@ -607,12 +607,16 @@ class IKEv2_payload_TSr(IKEv2_class):
class IKEv2_payload_Delete(IKEv2_class):
name = "IKEv2 Vendor ID"
+ name = "IKEv2 delete payload"
overload_fields = {IKEv2: {"next_payload": 42}}
fields_desc = [
ByteEnumField("next_payload", None, IKEv2_payload_type),
ByteField("res", 0),
- FieldLenField("length", None, "vendorID", "H", adjust=lambda pkt, x:x + 4), # noqa: E501
- StrLenField("vendorID", "", length_from=lambda x:x.length - 4),
+ FieldLenField("length", None, "SPIs", "H", adjust=lambda pkt, x:x + 8), # noqa: E501
+ ByteEnumField("proto", 1, {1: "IKEv2", 2: "AH", 3: "ESP"}),
+ ByteField("SPIsize", 0),
+ ShortField("SPInum", 0),
+ StrLenField("SPIs", "", length_from=lambda x: x.length - 8),
]

View File

@ -0,0 +1,212 @@
diff --git a/scapy/layers/ipsec.py b/scapy/layers/ipsec.py
index 8251dc14..a8390fc5 100644
--- a/scapy/layers/ipsec.py
+++ b/scapy/layers/ipsec.py
@@ -60,7 +60,7 @@ import scapy.modules.six as six
from scapy.modules.six.moves import range
from scapy.layers.inet6 import IPv6, IPv6ExtHdrHopByHop, IPv6ExtHdrDestOpt, \
IPv6ExtHdrRouting
-
+from scapy.contrib.mpls import MPLS
###############################################################################
class AH(Packet):
@@ -360,13 +360,16 @@ class CryptAlgo(object):
encryptor = cipher.encryptor()
if self.is_aead:
- if esn_en:
- aad = struct.pack('!LLL', esp.spi, esn, esp.seq)
- else:
- aad = struct.pack('!LL', esp.spi, esp.seq)
+ aad = sa.build_aead(esp)
+ if self.name == 'AES-NULL-GMAC':
+ aad = aad + esp.iv + data
+ aes_null_gmac_data = data
+ data = b''
encryptor.authenticate_additional_data(aad)
data = encryptor.update(data) + encryptor.finalize()
data += encryptor.tag[:self.icv_size]
+ if self.name == 'AES-NULL-GMAC':
+ data = aes_null_gmac_data + data
else:
data = encryptor.update(data) + encryptor.finalize()
@@ -402,16 +405,18 @@ class CryptAlgo(object):
if self.is_aead:
# Tag value check is done during the finalize method
- if esn_en:
- decryptor.authenticate_additional_data(
- struct.pack('!LLL', esp.spi, esn, esp.seq))
- else:
- decryptor.authenticate_additional_data(
- struct.pack('!LL', esp.spi, esp.seq))
+ aad = sa.build_aead(esp)
+ if self.name == 'AES-NULL-GMAC':
+ aad = aad + iv + data
+ aes_null_gmac_data = data
+ data = b''
+ decryptor.authenticate_additional_data(aad)
try:
data = decryptor.update(data) + decryptor.finalize()
except InvalidTag as err:
raise IPSecIntegrityError(err)
+ if self.name == 'AES-NULL-GMAC':
+ data = aes_null_gmac_data + data
# extract padlen and nh
padlen = orb(data[-2])
@@ -458,6 +463,13 @@ if algorithms:
iv_size=8,
icv_size=16,
format_mode_iv=_salt_format_mode_iv)
+ CRYPT_ALGOS['AES-NULL-GMAC'] = CryptAlgo('AES-NULL-GMAC',
+ cipher=algorithms.AES,
+ mode=modes.GCM,
+ salt_size=4,
+ iv_size=8,
+ icv_size=16,
+ format_mode_iv=_salt_format_mode_iv)
if hasattr(modes, 'CCM'):
CRYPT_ALGOS['AES-CCM'] = CryptAlgo('AES-CCM',
cipher=algorithms.AES,
@@ -546,7 +558,7 @@ class AuthAlgo(object):
else:
return self.mac(key, self.digestmod(), default_backend())
- def sign(self, pkt, key, esn_en=False, esn=0):
+ def sign(self, pkt, key, trailer=None, esn_en=False, esn=0):
"""
Sign an IPsec (ESP or AH) packet with this algo.
@@ -565,6 +577,8 @@ class AuthAlgo(object):
if pkt.haslayer(ESP):
mac.update(raw(pkt[ESP]))
+ if trailer:
+ mac.update(trailer)
pkt[ESP].data += mac.finalize()[:self.icv_size]
elif pkt.haslayer(AH):
@@ -574,11 +588,13 @@ class AuthAlgo(object):
else:
temp = raw(clone)
mac.update(temp)
+ if trailer:
+ mac.update(trailer)
pkt[AH].icv = mac.finalize()[:self.icv_size]
return pkt
- def verify(self, pkt, key, esn_en=False, esn=0):
+ def verify(self, pkt, key, trailer, esn_en=False, esn=0):
"""
Check that the integrity check value (icv) of a packet is valid.
@@ -617,6 +633,8 @@ class AuthAlgo(object):
temp = raw(clone)
mac.update(temp)
+ if trailer:
+ mac.update(trailer) # bytearray(4)) #raw(trailer))
computed_icv = mac.finalize()[:self.icv_size]
# XXX: Cannot use mac.verify because the ICV can be truncated
@@ -805,7 +823,7 @@ class SecurityAssociation(object):
This class is responsible of "encryption" and "decryption" of IPsec packets. # noqa: E501
"""
- SUPPORTED_PROTOS = (IP, IPv6)
+ SUPPORTED_PROTOS = (IP, IPv6, MPLS)
def __init__(self, proto, spi, seq_num=1, crypt_algo=None, crypt_key=None,
auth_algo=None, auth_key=None, tunnel_header=None, nat_t_header=None, esn_en=False, esn=0): # noqa: E501
@@ -880,6 +898,23 @@ class SecurityAssociation(object):
raise TypeError('nat_t_header must be %s' % UDP.name)
self.nat_t_header = nat_t_header
+ def build_aead(self, esp):
+ if self.esn_en:
+ return (struct.pack('!LLL', esp.spi, self.seq_num >> 32, esp.seq))
+ else:
+ return (struct.pack('!LL', esp.spi, esp.seq))
+
+ def build_seq_num(self, num):
+ # only lower order bits are transmitted
+ # higher order bits are used in the ICV
+ lower = num & 0xffffffff
+ upper = num >> 32
+
+ if self.esn_en:
+ return lower, struct.pack("!I", upper)
+ else:
+ return lower, None
+
def check_spi(self, pkt):
if pkt.spi != self.spi:
raise TypeError('packet spi=0x%x does not match the SA spi=0x%x' %
@@ -893,7 +928,8 @@ class SecurityAssociation(object):
if len(iv) != self.crypt_algo.iv_size:
raise TypeError('iv length must be %s' % self.crypt_algo.iv_size) # noqa: E501
- esp = _ESPPlain(spi=self.spi, seq=seq_num or self.seq_num, iv=iv)
+ low_seq_num, high_seq_num = self.build_seq_num(seq_num or self.seq_num)
+ esp = _ESPPlain(spi=self.spi, seq=low_seq_num, iv=iv)
if self.tunnel_header:
tunnel = self.tunnel_header.copy()
@@ -917,7 +953,7 @@ class SecurityAssociation(object):
esn_en=esn_en or self.esn_en,
esn=esn or self.esn)
- self.auth_algo.sign(esp, self.auth_key)
+ self.auth_algo.sign(esp, self.auth_key, high_seq_num)
if self.nat_t_header:
nat_t_header = self.nat_t_header.copy()
@@ -944,7 +980,8 @@ class SecurityAssociation(object):
def _encrypt_ah(self, pkt, seq_num=None, esn_en=False, esn=0):
- ah = AH(spi=self.spi, seq=seq_num or self.seq_num,
+ low_seq_num, high_seq_num = self.build_seq_num(seq_num or self.seq_num)
+ ah = AH(spi=self.spi, seq=low_seq_num,
icv=b"\x00" * self.auth_algo.icv_size)
if self.tunnel_header:
@@ -985,7 +1022,7 @@ class SecurityAssociation(object):
ip_header.plen = len(ip_header.payload) + len(ah) + len(payload)
signed_pkt = self.auth_algo.sign(ip_header / ah / payload,
- self.auth_key,
+ self.auth_key, high_seq_num,
esn_en=esn_en or self.esn_en,
esn=esn or self.esn)
@@ -1025,11 +1062,12 @@ class SecurityAssociation(object):
def _decrypt_esp(self, pkt, verify=True, esn_en=None, esn=None):
+ low_seq_num, high_seq_num = self.build_seq_num(self.seq_num)
encrypted = pkt[ESP]
if verify:
self.check_spi(pkt)
- self.auth_algo.verify(encrypted, self.auth_key)
+ self.auth_algo.verify(encrypted, self.auth_key, high_seq_num)
esp = self.crypt_algo.decrypt(self, encrypted, self.crypt_key,
self.crypt_algo.icv_size or
@@ -1070,9 +1108,10 @@ class SecurityAssociation(object):
def _decrypt_ah(self, pkt, verify=True, esn_en=None, esn=None):
+ low_seq_num, high_seq_num = self.build_seq_num(self.seq_num)
if verify:
self.check_spi(pkt)
- self.auth_algo.verify(pkt, self.auth_key,
+ self.auth_algo.verify(pkt, self.auth_key, high_seq_num,
esn_en=esn_en or self.esn_en,
esn=esn or self.esn)

View File

@ -0,0 +1,45 @@
# NOTE: This patch copied from https://github.com/secdev/scapy
# commit 3e6900776698cd5472c5405294414d5b672a3f18
#
diff --git a/scapy/layers/ppp.py b/scapy/layers/ppp.py
index b5cd42b4..e0f4c593 100644
--- a/scapy/layers/ppp.py
+++ b/scapy/layers/ppp.py
@@ -292,6 +292,14 @@ class _PPPProtoField(EnumField):
See RFC 1661 section 2
<https://tools.ietf.org/html/rfc1661#section-2>
+
+ The generated proto field is two bytes when not specified, or when specified
+ as an integer or a string:
+ PPP()
+ PPP(proto=0x21)
+ PPP(proto="Internet Protocol version 4")
+ To explicitly forge a one byte proto field, use the bytes representation:
+ PPP(proto=b'\x21')
"""
def getfield(self, pkt, s):
if ord(s[:1]) & 0x01:
@@ -304,12 +312,18 @@ class _PPPProtoField(EnumField):
return super(_PPPProtoField, self).getfield(pkt, s)
def addfield(self, pkt, s, val):
- if val < 0x100:
- self.fmt = "!B"
- self.sz = 1
+ if isinstance(val, bytes):
+ if len(val) == 1:
+ fmt, sz = "!B", 1
+ elif len(val) == 2:
+ fmt, sz = "!H", 2
+ else:
+ raise TypeError('Invalid length for PPP proto')
+ val = struct.Struct(fmt).unpack(val)[0]
else:
- self.fmt = "!H"
- self.sz = 2
+ fmt, sz = "!H", 2
+ self.fmt = fmt
+ self.sz = sz
self.struct = struct.Struct(self.fmt)
return super(_PPPProtoField, self).addfield(pkt, s, val)

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,4 @@
pip-tools==7.3.0 # BSD Keep this in sync with Makefile's PIP_TOOLS_VERSION
pip-tools==7.4.1 # BSD Keep this in sync with Makefile's PIP_TOOLS_VERSION
cryptography!=2.0 # BSD/Apache-2.0
deprecation>=2.0.6 # Apache-2.0
faulthandler; python_version < '3.3' # # BSD License (2 clause)
@ -6,7 +6,7 @@ ipaddress; python_version < '3.3' # PSF
parameterized>=0.6.1 # BSD
pexpect # ISC
psutil # BSD
scapy==2.4.3; python_version >= '2.7' or python_version >= '3.4' # GPL2 https://github.com/secdev/scapy/blob/master/LICENSE
scapy==2.4.5; python_version >= '3.4' # GPL2 https://github.com/secdev/scapy/blob/master/LICENSE
six # MIT
syslog_rfc5424_parser>=0.3.1 # ISC
objgraph # MIT

View File

@ -1,11 +1,13 @@
import unittest
import socket
import struct
import re
import os
from scapy.layers.inet import IP, ICMP, TCP, UDP
from scapy.layers.ipsec import SecurityAssociation, ESP
from scapy.layers.l2 import Ether
from scapy.packet import raw, Raw
from scapy.packet import raw, Raw, Padding
from scapy.layers.inet6 import (
IPv6,
ICMPv6EchoRequest,
@ -22,8 +24,6 @@ from vpp_papi import VppEnum
from vpp_ipsec import VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSpdItfBinding
from ipaddress import ip_address
from re import search
from os import popen
from config import config

View File

@ -121,7 +121,7 @@ class TestDHCP(VppTestCase):
for i in dhcp.options:
if isinstance(i, tuple):
if i[0] == "relay_agent_Information":
if i[0] == "relay_agent_information":
#
# There are two sb-options present - each of length 6.
#
@ -532,7 +532,7 @@ class TestDHCP(VppTestCase):
/ DHCP(
options=[
("message-type", "offer"),
("relay_agent_Information", option_82),
("relay_agent_information", option_82),
("end"),
]
)
@ -543,7 +543,7 @@ class TestDHCP(VppTestCase):
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg3.get_capture(1)
rx = self.pg3.get_capture(1, timeout=5)
rx = rx[0]
self.verify_dhcp_offer(rx, self.pg3)
@ -563,7 +563,7 @@ class TestDHCP(VppTestCase):
/ DHCP(
options=[
("message-type", "offer"),
("relay_agent_Information", bad_ip),
("relay_agent_information", bad_ip),
("end"),
]
)
@ -584,7 +584,7 @@ class TestDHCP(VppTestCase):
/ DHCP(
options=[
("message-type", "offer"),
("relay_agent_Information", bad_if_index),
("relay_agent_information", bad_if_index),
("end"),
]
)
@ -748,7 +748,7 @@ class TestDHCP(VppTestCase):
/ DHCP(
options=[
("message-type", "offer"),
("relay_agent_Information", option_82),
("relay_agent_information", option_82),
("end"),
]
)
@ -761,7 +761,7 @@ class TestDHCP(VppTestCase):
/ DHCP(
options=[
("message-type", "offer"),
("relay_agent_Information", option_82),
("relay_agent_information", option_82),
("end"),
]
)
@ -788,7 +788,7 @@ class TestDHCP(VppTestCase):
/ DHCP(
options=[
("message-type", "offer"),
("relay_agent_Information", option_82),
("relay_agent_information", option_82),
("end"),
]
)

View File

@ -307,7 +307,7 @@ class TestDHCPv6IANAControlPlane(VppTestCase):
return addresses.difference(self.initial_addresses)
def validate_duid_ll(self, duid):
DUID_LL(duid)
DUID_LL(bytes(duid))
def validate_packet(self, packet, msg_type, is_resend=False):
try:
@ -562,7 +562,7 @@ class TestDHCPv6PDControlPlane(VppTestCase):
return addresses.difference(self.initial_addresses)
def validate_duid_ll(self, duid):
DUID_LL(duid)
DUID_LL(bytes(duid))
def validate_packet(self, packet, msg_type, is_resend=False):
try:

View File

@ -1215,7 +1215,7 @@ class TemplateInitiator(IkePeer):
if self.no_idr_auth:
self.assertEqual(idi.next_payload, 39) # AUTH
else:
idr = ikev2.IKEv2_payload_IDr(idi.payload)
idr = ikev2.IKEv2_payload_IDr(bytes(idi.payload))
self.assertEqual(idr.load, self.sa.r_id)
prop = idi[ikev2.IKEv2_payload_Proposal]
c = self.sa.child_sas[0]

View File

@ -1456,7 +1456,7 @@ class TestIPLoadBalance(VppTestCase):
src_pkts.append(
(
Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
/ IP(dst="1.1.1.1", src="20.0.0.%d" % ii)
/ IP(dst="1.1.1.1", src="20.0.0.%d" % (ii % 256))
/ UDP(sport=1234, dport=1234)
/ Raw(b"\xa5" * 100)
)

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python3
import unittest
from socket import inet_pton, inet_ntop
from socket import inet_pton, inet_ntop, AF_INET6
from framework import VppTestCase
from asfframework import VppTestRunner
@ -59,8 +59,8 @@ class TestNDPROXY(VppTestCase):
# will come from the VPP's own address.
#
addr = self.pg0.remote_ip6
nsma = in6_getnsma(inet_pton(socket.AF_INET6, addr))
d = inet_ntop(socket.AF_INET6, nsma)
nsma = in6_getnsma(inet_pton(AF_INET6, addr))
d = inet_ntop(AF_INET6, nsma)
# Make pg1 un-numbered to pg0
#

View File

@ -88,7 +88,7 @@ class TestIPIP(VppTestCase):
self.table.remove_vpp_config()
def validate(self, rx, expected):
self.assertEqual(rx, expected.__class__(expected))
self.assertTrue(bytes(rx), bytes(expected))
def generate_ip4_frags(self, payload_length, fragment_size):
p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
@ -764,7 +764,7 @@ class TestIPIP6(VppTestCase):
rv = self.vapi.ipip_del_tunnel(sw_if_index=self.tunnel_if_index)
def validate(self, rx, expected):
self.assertEqual(rx, expected.__class__(expected))
self.assertTrue(bytes(rx), bytes(expected))
def generate_ip6_frags(self, payload_length, fragment_size):
p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)

View File

@ -518,6 +518,7 @@ class TestIpsecAhHandoff(TemplateIpsecAh, IpsecTun6HandoffTests, IpsecTun4Handof
pass
@unittest.skipIf(True, "Temporarily skip test until Scapy-2.4.5 patch is available")
class TestIpsecAhAll(ConfigIpsecAH, IpsecTra4, IpsecTra6, IpsecTun4, IpsecTun6):
"""Ipsec AH all Algos"""

View File

@ -421,6 +421,7 @@ class TemplateIpsecEsp(ConfigIpsecESP):
super(TemplateIpsecEsp, self).tearDown()
@unittest.skipIf(True, "Temporarily skip test until Scapy-2.4.5 patch is available")
class TestIpsecEsp1(
TemplateIpsecEsp, IpsecTra46Tests, IpsecTun46Tests, IpsecTra6ExtTests
):

View File

@ -4,7 +4,7 @@ import copy
from scapy.layers.ipsec import SecurityAssociation, ESP
from scapy.layers.l2 import Ether, GRE, Dot1Q
from scapy.packet import Raw, bind_layers
from scapy.packet import Raw, bind_layers, Padding
from scapy.layers.inet import IP, UDP, ICMP
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
from scapy.contrib.mpls import MPLS

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python3
import unittest
import socket
from scapy.layers.inet import IP, UDP
from scapy.layers.inet6 import IPv6, Raw
@ -241,7 +242,7 @@ class TestLinuxCP(VppTestCase):
self.assertEqual(rx[Ether].dst, phy.remote_mac)
self.assertEqual(rx[IP].dst, phy.remote_ip4)
self.assertEqual(rx[IP].src, phy.local_ip4)
inner = IP(rx[IP].payload)
inner = IP(bytes(rx[IP].payload))
self.assertEqual(inner.src, tun4.local_ip4)
self.assertEqual(inner.dst, "2.2.2.2")
@ -254,7 +255,7 @@ class TestLinuxCP(VppTestCase):
for rx in rxs:
self.assertEqual(rx[IPv6].dst, phy.remote_ip6)
self.assertEqual(rx[IPv6].src, phy.local_ip6)
inner = IPv6(rx[IPv6].payload)
inner = IPv6(bytes(rx[IPv6].payload))
self.assertEqual(inner.src, tun6.local_ip6)
self.assertEqual(inner.dst, "2::2")
@ -269,7 +270,7 @@ class TestLinuxCP(VppTestCase):
rxs = self.send_and_expect(phy, p * N_PKTS, self.pg4)
for rx in rxs:
rx = IP(rx)
rx = IP(bytes(rx))
self.assertEqual(rx[IP].dst, tun4.local_ip4)
self.assertEqual(rx[IP].src, tun4.remote_ip4)
@ -284,7 +285,7 @@ class TestLinuxCP(VppTestCase):
rxs = self.send_and_expect(phy, p * N_PKTS, self.pg5)
for rx in rxs:
rx = IPv6(rx)
rx = IPv6(bytes(rx))
self.assertEqual(rx[IPv6].dst, tun6.local_ip6)
self.assertEqual(rx[IPv6].src, tun6.remote_ip6)
@ -353,7 +354,7 @@ class TestLinuxCPIpsec(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
def verify_decrypted(self, p, rxs):
for rx in rxs:
rx = IP(rx)
rx = IP(bytes(rx))
self.assert_equal(rx[IP].src, p.tun_if.remote_ip4)
self.assert_equal(rx[IP].dst, p.tun_if.local_ip4)
self.assert_packet_checksums_valid(rx)

View File

@ -55,7 +55,7 @@ class TestMTU(VppTestCase):
i.admin_down()
def validate(self, rx, expected):
self.assertEqual(rx, expected.__class__(expected))
self.assertTrue(bytes(rx), bytes(expected))
def validate_bytes(self, rx, expected):
self.assertEqual(rx, expected)
@ -104,13 +104,10 @@ class TestMTU(VppTestCase):
/ p_ip4
/ p_payload
)
n = icmp4_reply.__class__(icmp4_reply)
s = bytes(icmp4_reply)
icmp4_reply = s[0:576]
rx = self.send_and_expect_some(self.pg0, p4 * 11, self.pg0)
for p in rx:
# p.show2()
# n.show2()
self.validate_bytes(bytes(p[1]), icmp4_reply)
self.assert_error_counter_equal("/err/ip4-input/mtu_exceeded", 11)
@ -185,7 +182,6 @@ class TestMTU(VppTestCase):
/ p_payload
)
icmp6_reply[2].hlim -= 1
n = icmp6_reply.__class__(icmp6_reply)
s = bytes(icmp6_reply)
icmp6_reply_str = s[0:1280]

View File

@ -3,6 +3,8 @@
import random
import unittest
import struct
import socket
from scapy.layers.inet import Ether, IP, TCP
from scapy.packet import Raw
from scapy.data import IP_PROTOS

View File

@ -83,7 +83,7 @@ class TestPgTun(VppTestCase):
rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg1)
for rx in rxs:
rx = IP(rx)
rx = IP(bytes(rx))
self.assertFalse(rx.haslayer(Ether))
self.assertEqual(rx[IP].dst, self.pg1.remote_ip4)
@ -97,7 +97,7 @@ class TestPgTun(VppTestCase):
rxs = self.send_and_expect(self.pg0, p * N_PKTS, self.pg2)
for rx in rxs:
rx = IPv6(rx)
rx = IPv6(bytes(rx))
self.assertFalse(rx.haslayer(Ether))
self.assertEqual(rx[IPv6].dst, self.pg2.remote_ip6)

View File

@ -40,7 +40,7 @@ class TestPNAT(VppTestCase):
i.admin_down()
def validate(self, rx, expected):
self.assertEqual(rx, expected.__class__(expected))
self.assertTrue(bytes(rx), bytes(expected))
def validate_bytes(self, rx, expected):
self.assertEqual(rx, expected)

View File

@ -224,7 +224,6 @@ class TestPPPoE(VppTestCase):
)
pppoe_if.add_vpp_config()
pppoe_if.set_unnumbered(self.pg0.sw_if_index)
#
# Send tunneled packets that match the created tunnel and
# are decapped and forwarded

View File

@ -3,7 +3,8 @@
import unittest
from scapy.packet import Raw
from scapy.layers.l2 import Ether, GRE, ERSPAN
from scapy.layers.l2 import Ether, GRE
from scapy.contrib.erspan import ERSPAN
from scapy.layers.inet import IP, UDP
from scapy.layers.vxlan import VXLAN

View File

@ -633,9 +633,9 @@ class TestUdpEncap(VppTestCase):
)
rx = self.send_and_expect(self.pg0, p_4 * NUM_PKTS, self.pg0)
p_4 = IP(p_4["UDP"].payload)
p_4 = IP(bytes(p_4["UDP"].payload))
for p in rx:
p = IP(p["Ether"].payload)
p = IP(bytes(p["Ether"].payload))
self.validate_inner4(p, p_4, ttl=63)
#
@ -651,10 +651,10 @@ class TestUdpEncap(VppTestCase):
)
rx = self.send_and_expect(self.pg1, p_6 * NUM_PKTS, self.pg1)
p_6 = IPv6(p_6["UDP"].payload)
p = IPv6(rx[0]["Ether"].payload)
p_6 = IPv6(bytes(p_6["UDP"].payload))
p = IPv6(bytes(rx[0]["Ether"].payload))
for p in rx:
p = IPv6(p["Ether"].payload)
p = IPv6(bytes(p["Ether"].payload))
self.validate_inner6(p, p_6, hlim=63)
#
@ -673,9 +673,9 @@ class TestUdpEncap(VppTestCase):
self.pg2.enable_mpls()
rx = self.send_and_expect(self.pg2, p_mo4 * NUM_PKTS, self.pg2)
self.pg2.disable_mpls()
p_mo4 = IP(MPLS(p_mo4["UDP"].payload).payload)
p_mo4 = IP(bytes(MPLS(bytes(p_mo4["UDP"].payload)).payload))
for p in rx:
p = IP(p["Ether"].payload)
p = IP(bytes(p["Ether"].payload))
self.validate_inner4(p, p_mo4, ttl=63)

View File

@ -258,7 +258,7 @@ class TestVxlan6(BridgeDomain, VppTestCase):
reassembled = util.reassemble4(payload)
self.assertEqual(Ether(raw(frame))[IP], reassembled[IP])
self.assertEqual(Ether(bytes(frame))[IP], reassembled[IP])
"""
Tests with default port (4789)

View File

@ -261,7 +261,7 @@ class VppWgPeer(VppObject):
def mk_cookie(self, p, tx_itf, is_resp=False, is_ip6=False):
self.verify_header(p, is_ip6)
wg_pkt = Wireguard(p[Raw])
wg_pkt = Wireguard(bytes(p[Raw]))
if is_resp:
self._test.assertEqual(wg_pkt[Wireguard].message_type, 2)
@ -310,7 +310,7 @@ class VppWgPeer(VppObject):
def consume_cookie(self, p, is_ip6=False):
self.verify_header(p, is_ip6)
cookie_reply = Wireguard(p[Raw])
cookie_reply = Wireguard(bytes(p[Raw]))
self._test.assertEqual(cookie_reply[Wireguard].message_type, 3)
self._test.assertEqual(cookie_reply[Wireguard].reserved_zero, 0)
@ -390,7 +390,7 @@ class VppWgPeer(VppObject):
self.noise_init(self.itf.public_key)
self.verify_header(p, is_ip6)
init = Wireguard(p[Raw])
init = Wireguard(bytes(p[Raw]))
self._test.assertEqual(init[Wireguard].message_type, 1)
self._test.assertEqual(init[Wireguard].reserved_zero, 0)
@ -438,9 +438,7 @@ class VppWgPeer(VppObject):
def consume_response(self, p, is_ip6=False):
self.verify_header(p, is_ip6)
resp = Wireguard(p[Raw])
resp = Wireguard(bytes(p[Raw]))
self._test.assertEqual(resp[Wireguard].message_type, 2)
self._test.assertEqual(resp[Wireguard].reserved_zero, 0)
self._test.assertEqual(
@ -456,7 +454,7 @@ class VppWgPeer(VppObject):
def decrypt_transport(self, p, is_ip6=False):
self.verify_header(p, is_ip6)
p = Wireguard(p[Raw])
p = Wireguard(bytes(p[Raw]))
self._test.assertEqual(p[Wireguard].message_type, 4)
self._test.assertEqual(p[Wireguard].reserved_zero, 0)
self._test.assertEqual(
@ -501,7 +499,7 @@ class VppWgPeer(VppObject):
def is_handshake_init(p):
wg_p = Wireguard(p[Raw])
wg_p = Wireguard(bytes(p[Raw]))
return wg_p[Wireguard].message_type == 1