make test: unify packet checksum verifications
Change-Id: If9cc7c5e32ebecff398fd38b39e8f485754a4ad4 Signed-off-by: Klement Sekera <ksekera@cisco.com>
This commit is contained in:

committed by
Damjan Marion

parent
ee6ddceaf5
commit
d81ae41825
@ -25,6 +25,9 @@ from vpp_papi_provider import VppPapiProvider
|
||||
from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
|
||||
getLogger, colorize
|
||||
from vpp_object import VppObjectRegistry
|
||||
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
|
||||
from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
|
||||
from scapy.layers.inet6 import ICMPv6EchoReply
|
||||
if os.name == 'posix' and sys.version_info[0] < 3:
|
||||
# using subprocess32 is recommended by python official documentation
|
||||
# @ https://docs.python.org/2/library/subprocess.html
|
||||
@ -32,6 +35,7 @@ if os.name == 'posix' and sys.version_info[0] < 3:
|
||||
else:
|
||||
import subprocess
|
||||
|
||||
|
||||
debug_framework = False
|
||||
if os.getenv('TEST_DEBUG', "0") == "1":
|
||||
debug_framework = True
|
||||
@ -729,6 +733,87 @@ class VppTestCase(unittest.TestCase):
|
||||
name, real_value, expected_min, expected_max)
|
||||
self.assertTrue(expected_min <= real_value <= expected_max, msg)
|
||||
|
||||
def assert_packet_checksums_valid(self, packet,
|
||||
ignore_zero_udp_checksums=True):
|
||||
udp_layers = ['UDP', 'UDPerror']
|
||||
checksum_fields = ['cksum', 'chksum']
|
||||
checksums = []
|
||||
counter = 0
|
||||
temp = packet.__class__(str(packet))
|
||||
while True:
|
||||
layer = temp.getlayer(counter)
|
||||
if layer:
|
||||
for cf in checksum_fields:
|
||||
if hasattr(layer, cf):
|
||||
if ignore_zero_udp_checksums and \
|
||||
0 == getattr(layer, cf) and \
|
||||
layer.name in udp_layers:
|
||||
continue
|
||||
delattr(layer, cf)
|
||||
checksums.append((counter, cf))
|
||||
else:
|
||||
break
|
||||
counter = counter + 1
|
||||
temp = temp.__class__(str(temp))
|
||||
for layer, cf in checksums:
|
||||
self.assert_equal(getattr(packet[layer], cf),
|
||||
getattr(temp[layer], cf),
|
||||
"packet checksum on layer #%d: %s" % (
|
||||
layer, temp[layer].name))
|
||||
|
||||
def assert_checksum_valid(self, received_packet, layer,
|
||||
field_name='chksum',
|
||||
ignore_zero_checksum=False):
|
||||
""" Check checksum of received packet on given layer """
|
||||
received_packet_checksum = getattr(received_packet[layer], field_name)
|
||||
if ignore_zero_checksum and 0 == received_packet_checksum:
|
||||
return
|
||||
recalculated = received_packet.__class__(str(received_packet))
|
||||
delattr(recalculated[layer], field_name)
|
||||
recalculated = recalculated.__class__(str(recalculated))
|
||||
self.assert_equal(received_packet_checksum,
|
||||
getattr(recalculated[layer], field_name),
|
||||
"packet checksum on layer: %s" % layer)
|
||||
|
||||
def assert_ip_checksum_valid(self, received_packet,
|
||||
ignore_zero_checksum=False):
|
||||
self.assert_checksum_valid(received_packet, 'IP',
|
||||
ignore_zero_checksum=ignore_zero_checksum)
|
||||
|
||||
def assert_tcp_checksum_valid(self, received_packet,
|
||||
ignore_zero_checksum=False):
|
||||
self.assert_checksum_valid(received_packet, 'TCP',
|
||||
ignore_zero_checksum=ignore_zero_checksum)
|
||||
|
||||
def assert_udp_checksum_valid(self, received_packet,
|
||||
ignore_zero_checksum=True):
|
||||
self.assert_checksum_valid(received_packet, 'UDP',
|
||||
ignore_zero_checksum=ignore_zero_checksum)
|
||||
|
||||
def assert_embedded_icmp_checksum_valid(self, received_packet):
|
||||
if received_packet.haslayer(IPerror):
|
||||
self.assert_checksum_valid(received_packet, 'IPerror')
|
||||
if received_packet.haslayer(TCPerror):
|
||||
self.assert_checksum_valid(received_packet, 'TCPerror')
|
||||
if received_packet.haslayer(UDPerror):
|
||||
self.assert_checksum_valid(received_packet, 'UDPerror',
|
||||
ignore_zero_checksum=True)
|
||||
if received_packet.haslayer(ICMPerror):
|
||||
self.assert_checksum_valid(received_packet, 'ICMPerror')
|
||||
|
||||
def assert_icmp_checksum_valid(self, received_packet):
|
||||
self.assert_checksum_valid(received_packet, 'ICMP')
|
||||
self.assert_embedded_icmp_checksum_valid(received_packet)
|
||||
|
||||
def assert_icmpv6_checksum_valid(self, pkt):
|
||||
if pkt.haslayer(ICMPv6DestUnreach):
|
||||
self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
|
||||
self.assert_embedded_icmp_checksum_valid(pkt)
|
||||
if pkt.haslayer(ICMPv6EchoRequest):
|
||||
self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
|
||||
if pkt.haslayer(ICMPv6EchoReply):
|
||||
self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
|
||||
|
||||
@classmethod
|
||||
def sleep(cls, timeout, remark=None):
|
||||
if hasattr(cls, 'logger'):
|
||||
|
Reference in New Issue
Block a user