VPP-1508 python3 tests: .encode('hex')
Change to binascii.hexlify() for consistent bahavior. Change-Id: Ie430cdd1ffeb6510db4aa037546e42d85992093b Signed-off-by: Paul Vinciguerra <pvinci@vinciconsulting.com>
This commit is contained in:
committed by
Ole Trøan
parent
6c746172ef
commit
6e4c6ad92e
@@ -1,6 +1,7 @@
|
||||
#!/usr/bin/env python
|
||||
"""ACL plugin - MACIP tests
|
||||
"""
|
||||
import binascii
|
||||
import random
|
||||
import re
|
||||
import unittest
|
||||
@@ -185,8 +186,8 @@ class MethodHolder(VppTestCase):
|
||||
rule = "DENY "
|
||||
print " IP6" if r.is_ipv6 else " IP4", \
|
||||
rule, \
|
||||
r.src_mac.encode('hex'), \
|
||||
r.src_mac_mask.encode('hex'),\
|
||||
binascii.hexlify(r.src_mac), \
|
||||
binascii.hexlify(r.src_mac_mask),\
|
||||
unpack('<16B', r.src_ip_addr), \
|
||||
r.src_ip_prefix_len
|
||||
return acls
|
||||
@@ -575,8 +576,8 @@ class MethodHolder(VppTestCase):
|
||||
# TODO : verify
|
||||
# for acl in acls:
|
||||
# for r in acl.r:
|
||||
# print r.src_mac.encode('hex'), \
|
||||
# r.src_mac_mask.encode('hex'),\
|
||||
# print binascii.hexlify(r.src_mac), \
|
||||
# binascii.hexlify(r.src_mac_mask),\
|
||||
# unpack('<16B', r.src_ip_addr), \
|
||||
# r.src_ip_prefix_len
|
||||
#
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import unittest
|
||||
import socket
|
||||
import binascii
|
||||
import sys
|
||||
import socket
|
||||
import unittest
|
||||
|
||||
from framework import VppTestCase, VppTestRunner
|
||||
|
||||
@@ -218,9 +217,9 @@ class TestClassifier(VppTestCase):
|
||||
:param int dst_port: destination port number "x"
|
||||
"""
|
||||
if src_ip:
|
||||
src_ip = socket.inet_aton(src_ip).encode('hex')
|
||||
src_ip = binascii.hexlify(socket.inet_aton(src_ip))
|
||||
if dst_ip:
|
||||
dst_ip = socket.inet_aton(dst_ip).encode('hex')
|
||||
dst_ip = binascii.hexlify(socket.inet_aton(dst_ip))
|
||||
|
||||
return ('{:0>20}{:0>12}{:0>8}{:0>4}{:0>4}'.format(
|
||||
hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:],
|
||||
|
||||
@@ -184,9 +184,11 @@ class TestClassifier(VppTestCase):
|
||||
:param int dst_port: destination port number "x"
|
||||
"""
|
||||
if src_ip:
|
||||
src_ip = socket.inet_pton(socket.AF_INET6, src_ip).encode('hex')
|
||||
src_ip = binascii.hexlify(socket.inet_pton(
|
||||
socket.AF_INET6, src_ip))
|
||||
if dst_ip:
|
||||
dst_ip = socket.inet_pton(socket.AF_INET6, dst_ip).encode('hex')
|
||||
dst_ip = binascii.hexlify(socket.inet_pton(
|
||||
socket.AF_INET6, dst_ip))
|
||||
|
||||
return ('{:0>14}{:0>34}{:0>32}{:0>4}{:0>4}'.format(
|
||||
hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:],
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import binascii
|
||||
import random
|
||||
import socket
|
||||
import unittest
|
||||
@@ -196,8 +198,8 @@ class MethodHolder(VppTestCase):
|
||||
if cflow.haslayer(Data):
|
||||
data = decoder.decode_data_set(cflow.getlayer(Set))
|
||||
for record in data:
|
||||
self.assertEqual(int(record[1].encode('hex'), 16), octets)
|
||||
self.assertEqual(int(record[2].encode('hex'), 16), packets)
|
||||
self.assertEqual(int(binascii.hexlify(record[1]), 16), octets)
|
||||
self.assertEqual(int(binascii.hexlify(record[2]), 16), packets)
|
||||
|
||||
def send_packets(self, src_if=None, dst_if=None):
|
||||
if src_if is None:
|
||||
@@ -225,9 +227,9 @@ class MethodHolder(VppTestCase):
|
||||
if data_set is not None:
|
||||
for record in data:
|
||||
# skip flow if in/out gress interface is 0
|
||||
if int(record[10].encode('hex'), 16) == 0:
|
||||
if int(binascii.hexlify(record[10]), 16) == 0:
|
||||
continue
|
||||
if int(record[14].encode('hex'), 16) == 0:
|
||||
if int(binascii.hexlify(record[14]), 16) == 0:
|
||||
continue
|
||||
|
||||
for field in data_set:
|
||||
@@ -247,7 +249,7 @@ class MethodHolder(VppTestCase):
|
||||
else:
|
||||
ip = socket.inet_pton(socket.AF_INET6,
|
||||
ip_layer.src)
|
||||
value = int(ip.encode('hex'), 16)
|
||||
value = int(binascii.hexlify(ip), 16)
|
||||
elif value == 'dst_ip':
|
||||
if ip_ver == 'v4':
|
||||
ip = socket.inet_pton(socket.AF_INET,
|
||||
@@ -255,12 +257,13 @@ class MethodHolder(VppTestCase):
|
||||
else:
|
||||
ip = socket.inet_pton(socket.AF_INET6,
|
||||
ip_layer.dst)
|
||||
value = int(ip.encode('hex'), 16)
|
||||
value = int(binascii.hexlify(ip), 16)
|
||||
elif value == 'sport':
|
||||
value = int(capture[0][UDP].sport)
|
||||
elif value == 'dport':
|
||||
value = int(capture[0][UDP].dport)
|
||||
self.assertEqual(int(record[field].encode('hex'), 16),
|
||||
self.assertEqual(int(binascii.hexlify(
|
||||
record[field]), 16),
|
||||
value)
|
||||
|
||||
def verify_cflow_data_notimer(self, decoder, capture, cflows):
|
||||
@@ -274,8 +277,10 @@ class MethodHolder(VppTestCase):
|
||||
for rec in data:
|
||||
p = capture[idx]
|
||||
idx += 1
|
||||
self.assertEqual(p[IP].len, int(rec[1].encode('hex'), 16))
|
||||
self.assertEqual(1, int(rec[2].encode('hex'), 16))
|
||||
self.assertEqual(p[IP].len, int(
|
||||
binascii.hexlify(rec[1]), 16))
|
||||
self.assertEqual(1, int(
|
||||
binascii.hexlify(rec[2]), 16))
|
||||
self.assertEqual(len(capture), idx)
|
||||
|
||||
def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1,
|
||||
@@ -411,25 +416,25 @@ class Flowprobe(MethodHolder):
|
||||
if cflow.haslayer(Data):
|
||||
record = ipfix_decoder.decode_data_set(cflow[0].getlayer(Set))[0]
|
||||
# ingress interface
|
||||
self.assertEqual(int(record[10].encode('hex'), 16), 8)
|
||||
self.assertEqual(int(binascii.hexlify(record[10]), 16), 8)
|
||||
# egress interface
|
||||
self.assertEqual(int(record[14].encode('hex'), 16), 9)
|
||||
self.assertEqual(int(binascii.hexlify(record[14]), 16), 9)
|
||||
# packets
|
||||
self.assertEqual(int(record[2].encode('hex'), 16), 1)
|
||||
self.assertEqual(int(binascii.hexlify(record[2]), 16), 1)
|
||||
# src mac
|
||||
self.assertEqual(':'.join(re.findall('..', record[56].encode(
|
||||
'hex'))), self.pg8.local_mac)
|
||||
# dst mac
|
||||
self.assertEqual(':'.join(re.findall('..', record[80].encode(
|
||||
'hex'))), self.pg8.remote_mac)
|
||||
flowTimestamp = int(record[156].encode('hex'), 16) >> 32
|
||||
flowTimestamp = int(binascii.hexlify(record[156]), 16) >> 32
|
||||
# flow start timestamp
|
||||
self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
|
||||
flowTimestamp = int(record[157].encode('hex'), 16) >> 32
|
||||
flowTimestamp = int(binascii.hexlify(record[157]), 16) >> 32
|
||||
# flow end timestamp
|
||||
self.assertAlmostEqual(flowTimestamp, nowUNIX, delta=1)
|
||||
# ethernet type
|
||||
self.assertEqual(int(record[256].encode('hex'), 16), 8)
|
||||
self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
|
||||
# src ip
|
||||
self.assertEqual('.'.join(re.findall('..', record[8].encode(
|
||||
'hex'))),
|
||||
@@ -441,13 +446,13 @@ class Flowprobe(MethodHolder):
|
||||
'.'.join('{:02x}'.format(int(n)) for n in
|
||||
"9.0.0.100".split('.')))
|
||||
# protocol (TCP)
|
||||
self.assertEqual(int(record[4].encode('hex'), 16), 6)
|
||||
self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
|
||||
# src port
|
||||
self.assertEqual(int(record[7].encode('hex'), 16), 1234)
|
||||
self.assertEqual(int(binascii.hexlify(record[7]), 16), 1234)
|
||||
# dst port
|
||||
self.assertEqual(int(record[11].encode('hex'), 16), 4321)
|
||||
self.assertEqual(int(binascii.hexlify(record[11]), 16), 4321)
|
||||
# tcp flags
|
||||
self.assertEqual(int(record[6].encode('hex'), 16), 80)
|
||||
self.assertEqual(int(binascii.hexlify(record[6]), 16), 80)
|
||||
|
||||
ipfix.remove_vpp_config()
|
||||
self.logger.info("FFP_TEST_FINISH_0000")
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
#!/usr/bin/env python
|
||||
import binascii
|
||||
import random
|
||||
import socket
|
||||
import unittest
|
||||
@@ -302,9 +303,8 @@ class TestIPv4FibCrud(VppTestCase):
|
||||
:return list: added ips with 32 prefix
|
||||
"""
|
||||
added_ips = []
|
||||
dest_addr = int(socket.inet_pton(socket.AF_INET,
|
||||
start_dest_addr).encode('hex'),
|
||||
16)
|
||||
dest_addr = int(binascii.hexlify(socket.inet_pton(socket.AF_INET,
|
||||
start_dest_addr)), 16)
|
||||
dest_addr_len = 32
|
||||
n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr)
|
||||
for _ in range(count):
|
||||
@@ -318,9 +318,8 @@ class TestIPv4FibCrud(VppTestCase):
|
||||
def unconfig_fib_many_to_one(self, start_dest_addr, next_hop_addr, count):
|
||||
|
||||
removed_ips = []
|
||||
dest_addr = int(socket.inet_pton(socket.AF_INET,
|
||||
start_dest_addr).encode('hex'),
|
||||
16)
|
||||
dest_addr = int(binascii.hexlify(socket.inet_pton(socket.AF_INET,
|
||||
start_dest_addr)), 16)
|
||||
dest_addr_len = 32
|
||||
n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr)
|
||||
for _ in range(count):
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import binascii
|
||||
import socket
|
||||
from abc import abstractmethod, ABCMeta
|
||||
|
||||
@@ -232,7 +233,7 @@ class VppInterface(object):
|
||||
if intf.sw_if_index == self.sw_if_index:
|
||||
self._name = intf.interface_name.split(b'\0', 1)[0]
|
||||
self._local_mac = \
|
||||
':'.join(intf.l2_address.encode('hex')[i:i + 2]
|
||||
':'.join(binascii.hexlify(intf.l2_address)[i:i + 2]
|
||||
for i in range(0, 12, 2))
|
||||
self._dump = intf
|
||||
break
|
||||
|
||||
Reference in New Issue
Block a user