PAPI: Allow ipaddress object as argument and return values from API calls

The API calls that use any of vl_api_address_t, vl_api_ip4_address,
vl_api_ip6_address_t, vl_api_prefix_t, vl_api_ip4_prefix_t,
vl_api_ip6_prefix_t now accepts either the old style dictionary,
a text string (2001:db8::/32) or an ipaddress ojbect.

Unless it is called with '_no_type_conversion':True, it will
also return an appropriate ipaddress object.

Change-Id: I84e4a1577bd57f6b5ae725f316a523988b6a955b
Signed-off-by: Ole Troan <ot@cisco.com>
This commit is contained in:
Ole Troan
2018-12-11 13:04:01 +01:00
committed by Damjan Marion
parent 41b25cf638
commit 0bcad32b38
13 changed files with 290 additions and 259 deletions

View File

@ -4,10 +4,10 @@ import unittest
from vpp_papi.vpp_serializer import VPPType, VPPEnumType
from vpp_papi.vpp_serializer import VPPUnionType, VPPMessage
from vpp_papi.vpp_serializer import VPPTypeAlias
from vpp_papi.vpp_format import VPPFormat
from socket import inet_pton, AF_INET, AF_INET6
import logging
import sys
from ipaddress import *
class TestAddType(unittest.TestCase):
@ -27,8 +27,10 @@ class TestAddType(unittest.TestCase):
af = VPPEnumType('vl_api_address_family_t', [["ADDRESS_IP4", 0],
["ADDRESS_IP6", 1],
{"enumtype": "u32"}])
ip4 = VPPType('vl_api_ip4_address_t', [['u8', 'address', 4]])
ip6 = VPPType('vl_api_ip6_address_t', [['u8', 'address', 16]])
ip4 = VPPTypeAlias('vl_api_ip4_address_t', {'type': 'u8',
'length': 4})
ip6 = VPPTypeAlias('vl_api_ip6_address_t', {'type': 'u8',
'length': 16})
VPPUnionType('vl_api_address_union_t',
[["vl_api_ip4_address_t", "ip4"],
["vl_api_ip6_address_t", "ip6"]])
@ -47,41 +49,34 @@ class TestAddType(unittest.TestCase):
'vla_address'],
['u8', 'is_cool']])
b = ip4.pack({'address': inet_pton(AF_INET, '1.1.1.1')})
b = ip4.pack(inet_pton(AF_INET, '1.1.1.1'))
self.assertEqual(len(b), 4)
nt, size = ip4.unpack(b)
self.assertEqual(nt.address, inet_pton(AF_INET, '1.1.1.1'))
self.assertEqual(str(nt), '1.1.1.1')
b = ip6.pack({'address': inet_pton(AF_INET6, '1::1')})
b = ip6.pack(inet_pton(AF_INET6, '1::1'))
self.assertEqual(len(b), 16)
b = address.pack({'af': af.ADDRESS_IP4,
'un':
{'ip4':
{'address': inet_pton(AF_INET, '2.2.2.2')}}})
{'ip4': inet_pton(AF_INET, '2.2.2.2')}})
self.assertEqual(len(b), 20)
nt, size = address.unpack(b)
self.assertEqual(nt.af, af.ADDRESS_IP4)
self.assertEqual(nt.un.ip4.address,
inet_pton(AF_INET, '2.2.2.2'))
self.assertEqual(nt.un.ip6.address,
inet_pton(AF_INET6, '0202:0202::'))
self.assertEqual(str(nt), '2.2.2.2')
# List of addresses
address_list = []
for i in range(4):
address_list.append({'af': af.ADDRESS_IP4,
'un':
{'ip4':
{'address': inet_pton(AF_INET, '2.2.2.2')}}})
{'ip4': inet_pton(AF_INET, '2.2.2.2')}})
b = va_address_list.pack({'count': len(address_list),
'addresses': address_list})
self.assertEqual(len(b), 81)
nt, size = va_address_list.unpack(b)
self.assertEqual(nt.addresses[0].un.ip4.address,
inet_pton(AF_INET, '2.2.2.2'))
self.assertEqual(str(nt.addresses[0]), '2.2.2.2')
b = message_with_va_address_list.pack({'vla_address':
{'count': len(address_list),
@ -97,6 +92,12 @@ class TestAddType(unittest.TestCase):
{"enumtype": "u32"}])
ip4 = VPPTypeAlias('vl_api_ip4_address_t', {'type': 'u8',
'length': 4})
b = ip4.pack('1.1.1.1')
self.assertEqual(len(b), 4)
nt, size = ip4.unpack(b)
self.assertEqual(str(nt), '1.1.1.1')
ip6 = VPPTypeAlias('vl_api_ip6_address_t', {'type': 'u8',
'length': 16})
VPPUnionType('vl_api_address_union_t',
@ -108,44 +109,65 @@ class TestAddType(unittest.TestCase):
['vl_api_address_union_t', 'un']])
prefix = VPPType('vl_api_prefix_t',
[['vl_api_address_t', 'address'],
['u8', 'address_length']])
[['vl_api_address_t', 'address'],
['u8', 'address_length']])
message = VPPMessage('svs',
[['vl_api_prefix_t', 'prefix']])
[['vl_api_prefix_t', 'prefix']])
message_addr = VPPMessage('svs_address',
[['vl_api_address_t', 'address']])
b = message_addr.pack({'address': "1::1"})
self.assertEqual(len(b), 20)
nt, size = message_addr.unpack(b)
self.assertEqual("1::1", VPPFormat.unformat(nt.address))
self.assertEqual("1::1", str(nt.address))
b = message_addr.pack({'address': "1.1.1.1"})
self.assertEqual(len(b), 20)
nt, size = message_addr.unpack(b)
self.assertEqual("1.1.1.1", VPPFormat.unformat(nt.address))
self.assertEqual("1.1.1.1", str(nt.address))
b = message.pack({'prefix': "1.1.1.1/24"})
b = message.pack({'prefix': "1.1.1.0/24"})
self.assertEqual(len(b), 21)
nt, size = message.unpack(b)
self.assertEqual("1.1.1.1/24", VPPFormat.unformat(nt.prefix))
self.assertEqual("1.1.1.0/24", str(nt.prefix))
message_array = VPPMessage('address_array',
[['vl_api_ip4_address_t',
[['vl_api_ip6_address_t',
'addresses', 2]])
b = message_array.pack({'addresses': ["1::1", "2::2"]})
self.assertEqual(len(b), 8)
b = message_array.pack({'addresses': [IPv6Address(u"1::1"), "2::2"]})
self.assertEqual(len(b), 32)
message_array_vla = VPPMessage('address_array_vla',
[['u32', 'num'],
['vl_api_ip4_address_t',
['vl_api_ip6_address_t',
'addresses', 0, 'num']])
b = message_array_vla.pack({'addresses': ["1::1", "2::2"], 'num': 2})
self.assertEqual(len(b), 12)
self.assertEqual(len(b), 36)
message_array4 = VPPMessage('address_array4',
[['vl_api_ip4_address_t',
'addresses', 2]])
b = message_array4.pack({'addresses': ["1.1.1.1", "2.2.2.2"]})
self.assertEqual(len(b), 8)
b = message_array4.pack({'addresses': [IPv4Address(u"1.1.1.1"),
"2.2.2.2"]})
self.assertEqual(len(b), 8)
message = VPPMessage('address', [['vl_api_address_t', 'address']])
b = message.pack({'address': '1::1'})
self.assertEqual(len(b), 20)
b = message.pack({'address': '1.1.1.1'})
self.assertEqual(len(b), 20)
message = VPPMessage('prefix', [['vl_api_prefix_t', 'prefix']])
b = message.pack({'prefix': '1::1/130'})
self.assertEqual(len(b), 21)
b = message.pack({'prefix': IPv6Network(u'1::/119')})
self.assertEqual(len(b), 21)
b = message.pack({'prefix': IPv4Network(u'1.1.0.0/16')})
self.assertEqual(len(b), 21)
def test_zero_vla(self):
'''Default zero'ed out for VLAs'''
list = VPPType('vl_api_list_t',
[['u8', 'count', 10]])
[['u8', 'count', 10]])
# Define an embedded VLA type
valist = VPPType('vl_api_valist_t',
@ -153,12 +175,12 @@ class TestAddType(unittest.TestCase):
['u8', 'string', 0, 'count']])
# Define a message
vamessage = VPPMessage('vamsg',
[['vl_api_valist_t', 'valist'],
['u8', 'is_something']])
[['vl_api_valist_t', 'valist'],
['u8', 'is_something']])
message = VPPMessage('msg',
[['vl_api_list_t', 'list'],
['u8', 'is_something']])
[['vl_api_list_t', 'list'],
['u8', 'is_something']])
# Pack message without VLA specified
b = message.pack({'is_something': 1})

View File

@ -15,137 +15,107 @@
from socket import inet_pton, inet_ntop, AF_INET6, AF_INET
import socket
import ipaddress
# Copies from vl_api_address_t definition
ADDRESS_IP4 = 0
ADDRESS_IP6 = 1
#
# Type conversion for input arguments and return values
#
class VPPFormatError(Exception):
pass
def format_vl_api_address_t(args):
try:
return {'un': {'ip6': inet_pton(AF_INET6, args)},
'af': ADDRESS_IP6}
except socket.error as e:
return {'un': {'ip4': inet_pton(AF_INET, args)},
'af': ADDRESS_IP4}
class VPPFormat(object):
VPPFormatError = VPPFormatError
def format_vl_api_prefix_t(args):
p, length = args.split('/')
return {'address': format_vl_api_address_t(p),
'address_length': int(length)}
@staticmethod
def format_vl_api_ip6_prefix_t(args):
prefix, len = args.split('/')
return {'prefix': {'address': inet_pton(AF_INET6, prefix)},
'len': int(len)}
@staticmethod
def unformat_vl_api_ip6_prefix_t(args):
return "{}/{}".format(inet_ntop(AF_INET6, args.prefix),
args.len)
def format_vl_api_ip6_prefix_t(args):
p, length = args.split('/')
return {'prefix': inet_pton(AF_INET6, p),
'len': int(length)}
@staticmethod
def format_vl_api_ip4_prefix_t(args):
prefix, len = args.split('/')
return {'prefix': {'address': inet_pton(AF_INET, prefix)},
'len': int(len)}
@staticmethod
def unformat_vl_api_ip4_prefix_t(args):
return "{}/{}".format(inet_ntop(AF_INET, args.prefix),
args.len)
def format_vl_api_ip4_prefix_t(args):
p, length = args.split('/')
return {'prefix': inet_pton(AF_INET, p),
'len': int(length)}
@staticmethod
def format_vl_api_ip6_address_t(args):
return {'address': inet_pton(AF_INET6, args)}
@staticmethod
def format_vl_api_ip4_address_t(args):
return {'address': inet_pton(AF_INET, args)}
conversion_table = {
'vl_api_ip6_address_t':
{
'IPv6Address': lambda o: o.packed,
'str': lambda s: inet_pton(AF_INET6, s)
},
'vl_api_ip4_address_t':
{
'IPv4Address': lambda o: o.packed,
'str': lambda s: inet_pton(AF_INET, s)
},
'vl_api_ip6_prefix_t':
{
'IPv6Network': lambda o: {'prefix': o.network_address.packed,
'len': o.prefixlen},
'str': lambda s: format_vl_api_ip6_prefix_t(s)
},
'vl_api_ip4_prefix_t':
{
'IPv4Network': lambda o: {'prefix': o.network_address.packed,
'len': o.prefixlen},
'str': lambda s: format_vl_api_ip4_prefix_t(s)
},
'vl_api_address_t':
{
'IPv4Address': lambda o: {'af': ADDRESS_IP4, 'un': {'ip4': o.packed}},
'IPv6Address': lambda o: {'af': ADDRESS_IP6, 'un': {'ip6': o.packed}},
'str': lambda s: format_vl_api_address_t(s)
},
'vl_api_prefix_t':
{
'IPv4Network': lambda o: {'prefix':
{'af': ADDRESS_IP4, 'un':
{'ip4': o.network_address.packed}},
'len': o.prefixlen},
'IPv6Network': lambda o: {'prefix':
{'af': ADDRESS_IP6, 'un':
{'ip6': o.network_address.packed}},
'len': o.prefixlen},
'str': lambda s: format_vl_api_prefix_t(s)
},
}
@staticmethod
def format_vl_api_address_t(args):
try:
return {'un': {'ip6': inet_pton(AF_INET6, args)},
'af': int(1)}
except socket.error as e:
return {'un': {'ip4': inet_pton(AF_INET, args)},
'af': int(0)}
@staticmethod
def unformat_vl_api_address_t(arg):
if arg.af == 1:
return inet_ntop(AF_INET6, arg.un.ip6)
if arg.af == 0:
return inet_ntop(AF_INET, arg.un.ip4)
raise VPPFormatError
def unformat_api_address_t(o):
if o.af == 1:
return ipaddress.IPv6Address(o.un.ip6)
if o.af == 0:
return ipaddress.IPv4Address(o.un.ip4)
@staticmethod
def format_vl_api_prefix_t(args):
prefix, len = args.split('/')
return {'address': VPPFormat.format_vl_api_address_t(prefix),
'address_length': int(len)}
@staticmethod
def unformat_vl_api_prefix_t(arg):
if arg.address.af == 1:
return "{}/{}".format(inet_ntop(AF_INET6,
arg.address.un.ip6),
arg.address_length)
if arg.address.af == 0:
return "{}/{}".format(inet_ntop(AF_INET,
arg.address.un.ip4),
arg.address_length)
raise VPPFormatError
def unformat_api_prefix_t(o):
if isinstance(o.address, ipaddress.IPv4Address):
return ipaddress.IPv4Network((o.address, o.address_length), False)
if isinstance(o.address, ipaddress.IPv6Address):
return ipaddress.IPv6Network((o.address, o.address_length), False)
@staticmethod
def format_u8(args):
try:
return int(args)
except Exception as e:
return args.encode()
@staticmethod
def format(typename, args):
try:
return getattr(VPPFormat, 'format_' + typename)(args)
except AttributeError:
# Default
return (int(args))
@staticmethod
def unformat_bytes(args):
try:
return args.decode('utf-8')
except ValueError as e:
return args
@staticmethod
def unformat_list(args):
s = '['
for f in args:
t = type(f).__name__
if type(f) is int:
s2 = str(f)
else:
s2 = VPPFormat.unformat_t(t, f)
s += '{} '.format(s2)
return s[:-1] + ']'
@staticmethod
def unformat(args):
s = ''
return VPPFormat.unformat_t(type(args).__name__, args)
'''
for i, f in enumerate(args):
print('F', f)
t = type(f).__name__
if type(f) is int:
s2 = str(f)
else:
s2 = VPPFormat.unformat_t(t, f)
s += '{} {} '.format(args._fields[i], s2)
return s[:-1]
'''
@staticmethod
def unformat_t(typename, args):
try:
return getattr(VPPFormat, 'unformat_' + typename)(args)
except AttributeError:
# Type without explicit override
return VPPFormat.unformat(args)
# Default handling
return args
conversion_unpacker_table = {
'vl_api_ip6_address_t': lambda o: ipaddress.IPv6Address(o),
'vl_api_ip6_prefix_t': lambda o: ipaddress.IPv6Network((o.prefix, o.len)),
'vl_api_ip4_address_t': lambda o: ipaddress.IPv4Address(o),
'vl_api_ip4_prefix_t': lambda o: ipaddress.IPv4Network((o.prefix, o.len)),
'vl_api_address_t': lambda o: unformat_api_address_t(o),
'vl_api_prefix_t': lambda o: unformat_api_prefix_t(o),
}

View File

@ -28,7 +28,6 @@ import weakref
import atexit
from . vpp_serializer import VPPType, VPPEnumType, VPPUnionType, BaseTypes
from . vpp_serializer import VPPMessage, vpp_get_type, VPPTypeAlias
from . vpp_format import VPPFormat
if sys.version[0] == '2':
import Queue as queue
@ -56,11 +55,11 @@ def vpp_atexit(vpp_weakref):
vpp_instance.logger.debug('Cleaning up VPP on exit')
vpp_instance.disconnect()
def vpp_iterator(d):
if sys.version[0] == '2':
if sys.version[0] == '2':
def vpp_iterator(d):
return d.iteritems()
else:
else:
def vpp_iterator(d):
return d.items()
@ -409,7 +408,8 @@ class VPP(object):
# Create function for client side messages.
if name in self.services:
if 'stream' in self.services[name] and self.services[name]['stream']:
if 'stream' in self.services[name] and \
self.services[name]['stream']:
multipart = True
else:
multipart = False
@ -493,7 +493,7 @@ class VPP(object):
else:
raise VPPIOError(2, 'RPC reply message received in event handler')
def decode_incoming_msg(self, msg):
def decode_incoming_msg(self, msg, no_type_conversion=False):
if not msg:
self.logger.warning('vpp_api.read failed')
return
@ -508,7 +508,7 @@ class VPP(object):
if not msgobj:
raise VPPIOError(2, 'Reply message undefined')
r, size = msgobj.unpack(msg)
r, size = msgobj.unpack(msg, ntc=no_type_conversion)
return r
def msg_handler_async(self, msg):
@ -535,7 +535,7 @@ class VPP(object):
d = set(kwargs.keys()) - set(msg.field_by_name.keys())
if d:
raise VPPValueError('Invalid argument {} to {}'
.format(list(d), msg.name))
.format(list(d), msg.name))
def _call_vpp(self, i, msg, multipart, **kwargs):
"""Given a message, send the message and await a reply.
@ -560,6 +560,8 @@ class VPP(object):
context = kwargs['context']
kwargs['_vl_msg_id'] = i
no_type_conversion = kwargs.pop('_no_type_conversion', False)
try:
if self.transport.socket_index:
kwargs['client_index'] = self.transport.socket_index
@ -582,7 +584,7 @@ class VPP(object):
msg = self.transport.read()
if not msg:
raise VPPIOError(2, 'VPP API client: read failed')
r = self.decode_incoming_msg(msg)
r = self.decode_incoming_msg(msg, no_type_conversion)
msgname = type(r).__name__
if context not in r or r.context == 0 or context != r.context:
# Message being queued

View File

@ -17,7 +17,10 @@ import struct
import collections
from enum import IntEnum
import logging
from .vpp_format import VPPFormat
from . import vpp_format
import ipaddress
import sys
import socket
#
# Set log-level in application by doing e.g.:
@ -26,6 +29,32 @@ from .vpp_format import VPPFormat
#
logger = logging.getLogger(__name__)
if sys.version[0] == '2':
check = lambda d: type(d) is dict
else:
check = lambda d: type(d) is dict or type(d) is bytes
def conversion_required(data, field_type):
if check(data):
return False
try:
if type(data).__name__ in vpp_format.conversion_table[field_type]:
return True
except KeyError:
return False
def conversion_packer(data, field_type):
t = type(data).__name__
return types[field_type].pack(vpp_format.
conversion_table[field_type][t](data))
def conversion_unpacker(data, field_type):
if field_type not in vpp_format.conversion_unpacker_table:
return data
return vpp_format.conversion_unpacker_table[field_type](data)
class BaseTypes(object):
def __init__(self, type, elements=0):
@ -51,7 +80,7 @@ class BaseTypes(object):
data = 0
return self.packer.pack(data)
def unpack(self, data, offset, result=None):
def unpack(self, data, offset, result=None, ntc=False):
return self.packer.unpack_from(data, offset)[0], self.packer.size
@ -79,19 +108,21 @@ class FixedList_u8(object):
self.packer = BaseTypes(field_type, num)
self.size = self.packer.size
def pack(self, list, kwargs=None):
def pack(self, data, kwargs=None):
"""Packs a fixed length bytestring. Left-pads with zeros
if input data is too short."""
if not list:
if not data:
return b'\x00' * self.size
if len(list) > self.num:
if len(data) > self.num:
raise VPPSerializerValueError(
'Fixed list length error for "{}", got: {}'
' expected: {}'
.format(self.name, len(list), self.num))
return self.packer.pack(list)
.format(self.name, len(data), self.num))
def unpack(self, data, offset=0, result=None):
return self.packer.pack(data)
def unpack(self, data, offset=0, result=None, ntc=False):
if len(data[offset:]) < self.num:
raise VPPSerializerValueError(
'Invalid array length for "{}" got {}'
@ -105,6 +136,8 @@ class FixedList(object):
self.num = num
self.packer = types[field_type]
self.size = self.packer.size * num
self.name = name
self.field_type = field_type
def pack(self, list, kwargs):
if len(list) != self.num:
@ -116,12 +149,12 @@ class FixedList(object):
b += self.packer.pack(e)
return b
def unpack(self, data, offset=0, result=None):
def unpack(self, data, offset=0, result=None, ntc=False):
# Return a list of arguments
result = []
total = 0
for e in range(self.num):
x, size = self.packer.unpack(data, offset)
x, size = self.packer.unpack(data, offset, ntc=ntc)
result.append(x)
offset += size
total += size
@ -153,7 +186,7 @@ class VLAList(object):
b += self.packer.pack(e)
return b
def unpack(self, data, offset=0, result=None):
def unpack(self, data, offset=0, result=None, ntc=False):
# Return a list of arguments
total = 0
@ -162,11 +195,11 @@ class VLAList(object):
if result[self.index] == 0:
return b'', 0
p = BaseTypes('u8', result[self.index])
return p.unpack(data, offset)
return p.unpack(data, offset, ntc=ntc)
r = []
for e in range(result[self.index]):
x, size = self.packer.unpack(data, offset)
x, size = self.packer.unpack(data, offset, ntc=ntc)
r.append(x)
offset += size
total += size
@ -187,7 +220,7 @@ class VLAList_legacy():
b += self.packer.pack(e)
return b
def unpack(self, data, offset=0, result=None):
def unpack(self, data, offset=0, result=None, ntc=False):
total = 0
# Return a list of arguments
if (len(data) - offset) % self.packer.size:
@ -196,7 +229,7 @@ class VLAList_legacy():
elements = int((len(data) - offset) / self.packer.size)
r = []
for e in range(elements):
x, size = self.packer.unpack(data, offset)
x, size = self.packer.unpack(data, offset, ntc=ntc)
r.append(x)
offset += self.packer.size
total += size
@ -227,7 +260,7 @@ class VPPEnumType(object):
def pack(self, data, kwargs=None):
return types['u32'].pack(data)
def unpack(self, data, offset=0, result=None):
def unpack(self, data, offset=0, result=None, ntc=False):
x, size = types['u32'].unpack(data, offset)
return self.enum(x), size
@ -272,31 +305,53 @@ class VPPUnionType(object):
r[:len(b)] = b
return r
def unpack(self, data, offset=0, result=None):
def unpack(self, data, offset=0, result=None, ntc=False):
r = []
maxsize = 0
for k, p in self.packers.items():
x, size = p.unpack(data, offset)
x, size = p.unpack(data, offset, ntc=ntc)
if size > maxsize:
maxsize = size
r.append(x)
return self.tuple._make(r), maxsize
def VPPTypeAlias(name, msgdef):
t = vpp_get_type(msgdef['type'])
if not t:
raise ValueError()
if 'length' in msgdef:
if msgdef['length'] == 0:
class VPPTypeAlias(object):
def __init__(self, name, msgdef):
self.name = name
t = vpp_get_type(msgdef['type'])
if not t:
raise ValueError()
if msgdef['type'] == 'u8':
types[name] = FixedList_u8(name, msgdef['type'],
msgdef['length'])
if 'length' in msgdef:
if msgdef['length'] == 0:
raise ValueError()
if msgdef['type'] == 'u8':
self.packer = FixedList_u8(name, msgdef['type'],
msgdef['length'])
self.size = self.packer.size
else:
self.packer = FixedList(name, msgdef['type'], msgdef['length'])
else:
types[name] = FixedList(name, msgdef['type'], msgdef['length'])
else:
types[name] = t
self.packer = t
self.size = t.size
types[name] = self
def pack(self, data, kwargs=None):
if data and conversion_required(data, self.name):
try:
return conversion_packer(data, self.name)
# Python 2 and 3 raises different exceptions from inet_pton
except(OSError, socket.error, TypeError):
pass
return self.packer.pack(data, kwargs)
def unpack(self, data, offset=0, result=None, ntc=False):
t, size = self.packer.unpack(data, offset, result, ntc=ntc)
if not ntc:
return conversion_unpacker(t, self.name), size
return t, size
class VPPType(object):
@ -352,9 +407,12 @@ class VPPType(object):
if not kwargs:
kwargs = data
b = bytes()
for i, a in enumerate(self.fields):
# Try one of the format functions
# Try one of the format functions
if data and conversion_required(data, self.name):
return conversion_packer(data, self.name)
for i, a in enumerate(self.fields):
if data and type(data) is not dict and a not in data:
raise VPPSerializerValueError(
"Invalid argument: {} expected {}.{}".
@ -367,33 +425,27 @@ class VPPType(object):
else:
arg = data[a]
kwarg = kwargs[a] if a in kwargs else None
if isinstance(self.packers[i], VPPType):
try:
b += self.packers[i].pack(arg, kwarg)
except ValueError:
# Invalid argument, can we convert it?
arg = VPPFormat.format(self.packers[i].name, data[a])
data[a] = arg
kwarg = arg
b += self.packers[i].pack(arg, kwarg)
b += self.packers[i].pack(arg, kwarg)
else:
b += self.packers[i].pack(arg, kwargs)
return b
def unpack(self, data, offset=0, result=None):
def unpack(self, data, offset=0, result=None, ntc=False):
# Return a list of arguments
result = []
total = 0
for p in self.packers:
x, size = p.unpack(data, offset, result)
x, size = p.unpack(data, offset, result, ntc)
if type(x) is tuple and len(x) == 1:
x = x[0]
result.append(x)
offset += size
total += size
t = self.tuple._make(result)
if not ntc:
t = conversion_unpacker(t, self.name)
return t, total

View File

@ -14,7 +14,7 @@ from framework import VppTestCase, VppTestRunner
from util import ppp
from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpMRoute, \
VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind, \
VppMplsTable, VppIpTable, VppIpAddress
VppMplsTable, VppIpTable
from vpp_sub_interface import VppSubInterface, VppDot1QSubint, VppDot1ADSubint
@ -1123,7 +1123,7 @@ class TestIPPunt(VppTestCase):
#
# Configure a punt redirect via pg1.
#
nh_addr = VppIpAddress(self.pg1.remote_ip4).encode()
nh_addr = self.pg1.remote_ip4
self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
self.pg1.sw_if_index,
nh_addr)
@ -1187,7 +1187,7 @@ class TestIPPunt(VppTestCase):
#
# Configure a punt redirects
#
nh_address = VppIpAddress(self.pg3.remote_ip4).encode()
nh_address = self.pg3.remote_ip4
self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
self.pg3.sw_if_index,
nh_address)
@ -1196,7 +1196,7 @@ class TestIPPunt(VppTestCase):
nh_address)
self.vapi.ip_punt_redirect(self.pg2.sw_if_index,
self.pg3.sw_if_index,
VppIpAddress('0.0.0.0').encode())
'0.0.0.0')
#
# Dump pg0 punt redirects
@ -1212,8 +1212,8 @@ class TestIPPunt(VppTestCase):
self.assertEqual(len(punts), 3)
for p in punts:
self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
self.assertNotEqual(punts[1].punt.nh.un.ip4, self.pg3.remote_ip4)
self.assertEqual(punts[2].punt.nh.un.ip4, '\x00'*4)
self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip4)
self.assertEqual(str(punts[2].punt.nh), '0.0.0.0')
class TestIPDeag(VppTestCase):

View File

@ -21,7 +21,7 @@ from util import ppp, ip6_normalize
from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, VppIpMRoute, \
VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind, \
VppMplsRoute, VppMplsTable, VppIpTable, VppIpAddress
VppMplsRoute, VppMplsTable, VppIpTable
from vpp_neighbor import find_nbr, VppNeighbor
from vpp_pg_interface import is_ipv6_misc
from vpp_sub_interface import VppSubInterface, VppDot1QSubint
@ -1949,7 +1949,7 @@ class TestIP6Punt(VppTestCase):
#
# Configure a punt redirect via pg1.
#
nh_addr = VppIpAddress(self.pg1.remote_ip6).encode()
nh_addr = self.pg1.remote_ip6
self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
self.pg1.sw_if_index,
nh_addr)
@ -2013,7 +2013,7 @@ class TestIP6Punt(VppTestCase):
#
# Configure a punt redirects
#
nh_addr = VppIpAddress(self.pg3.remote_ip6).encode()
nh_addr = self.pg3.remote_ip6
self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
self.pg3.sw_if_index,
nh_addr)
@ -2022,7 +2022,7 @@ class TestIP6Punt(VppTestCase):
nh_addr)
self.vapi.ip_punt_redirect(self.pg2.sw_if_index,
self.pg3.sw_if_index,
VppIpAddress('0::0').encode())
'0::0')
#
# Dump pg0 punt redirects
@ -2039,8 +2039,8 @@ class TestIP6Punt(VppTestCase):
self.assertEqual(len(punts), 3)
for p in punts:
self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
self.assertNotEqual(punts[1].punt.nh.un.ip6, self.pg3.remote_ip6)
self.assertEqual(punts[2].punt.nh.un.ip6, '\x00'*16)
self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip6)
self.assertEqual(str(punts[2].punt.nh), '::')
class TestIPDeag(VppTestCase):

View File

@ -21,7 +21,6 @@ from scapy.layers.inet6 import IPv6, UDP, ICMPv6ND_NS, ICMPv6ND_RS, \
from framework import VppTestCase, VppTestRunner
from util import Host, ppp
from vpp_mac import VppMacAddress, mactobinary
from vpp_ip import VppIpAddress
class TestL2bdArpTerm(VppTestCase):
@ -74,7 +73,7 @@ class TestL2bdArpTerm(VppTestCase):
ip = e.ip4 if is_ipv6 == 0 else e.ip6
self.vapi.bd_ip_mac_add_del(bd_id=bd_id,
mac=VppMacAddress(e.mac).encode(),
ip=VppIpAddress(ip).encode(),
ip=ip,
is_ipv6=is_ipv6,
is_add=is_add)

View File

@ -6,7 +6,7 @@ import socket
from framework import VppTestCase, VppTestRunner
from vpp_ip import *
from vpp_ip_route import VppIpRoute, VppRoutePath
from ipaddress import IPv6Network, IPv4Network
from scapy.layers.l2 import Ether, Raw
from scapy.layers.inet import IP, UDP, ICMP, TCP, fragment
from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
@ -76,9 +76,9 @@ class TestMAP(VppTestCase):
#
# Add a domain that maps from pg0 to pg1
#
map_dst = VppIp6Prefix(map_br_pfx, map_br_pfx_len).encode()
map_src = VppIp6Prefix("3000::1", 128).encode()
client_pfx = VppIp4Prefix("192.168.0.0", 16).encode()
map_dst = '{}/{}'.format(map_br_pfx, map_br_pfx_len)
map_src = '3000::1/128'
client_pfx = '192.168.0.0/16'
self.vapi.map_add_domain(map_dst, map_src, client_pfx)
#
@ -168,10 +168,10 @@ class TestMAP(VppTestCase):
#
# Add a domain that maps from pg0 to pg1
#
map_dst = VppIp6Prefix("2001:db8::", 32).encode()
map_src = VppIp6Prefix("1234:5678:90ab:cdef::", 64).encode()
ip4_pfx = VppIp4Prefix("192.168.0.0", 24).encode()
self.vapi.map_add_domain(map_dst, map_src, ip4_pfx, 16, 6, 4, 1)
self.vapi.map_add_domain('2001:db8::/32',
'1234:5678:90ab:cdef::/64',
'192.168.0.0/24',
16, 6, 4, 1)
# Enable MAP-T on interfaces.

View File

@ -94,10 +94,10 @@ class TestSyslog(VppTestCase):
""" Syslog Protocol test """
self.vapi.syslog_set_sender(self.pg0.remote_ip4n, self.pg0.local_ip4n)
config = self.vapi.syslog_get_sender()
self.assertEqual(config.collector_address,
self.pg0.remote_ip4n)
self.assertEqual(str(config.collector_address),
self.pg0.remote_ip4)
self.assertEqual(config.collector_port, 514)
self.assertEqual(config.src_address, self.pg0.local_ip4n)
self.assertEqual(str(config.src_address), self.pg0.local_ip4)
self.assertEqual(config.vrf_id, 0)
self.assertEqual(config.max_msg_size, 480)

View File

@ -16,8 +16,7 @@ class IGMP_FILTER:
def find_igmp_state(states, itf, gaddr, saddr):
for s in states:
if s.sw_if_index == itf.sw_if_index and \
s.gaddr == socket.inet_pton(socket.AF_INET, gaddr) and \
s.saddr == socket.inet_pton(socket.AF_INET, saddr):
str(s.gaddr) == gaddr and str(s.saddr) == saddr:
return True
return False
@ -25,8 +24,7 @@ def find_igmp_state(states, itf, gaddr, saddr):
def wait_for_igmp_event(test, timeout, itf, gaddr, saddr, ff):
ev = test.vapi.wait_for_event(timeout, "igmp_event")
if ev.sw_if_index == itf.sw_if_index and \
ev.gaddr == socket.inet_pton(socket.AF_INET, gaddr) and \
ev.saddr == socket.inet_pton(socket.AF_INET, saddr) and \
str(ev.gaddr) == gaddr and str(ev.saddr) == saddr and \
ev.filter == ff:
return True
return False

View File

@ -192,20 +192,6 @@ class VppIpPrefix():
return False
class VppIp6Prefix():
def __init__(self, prefix, prefixlen):
self.ip_prefix = ip_address(unicode(prefix))
self.prefixlen = prefixlen
def encode(self):
return {'prefix': self.ip_prefix.packed,
'len': self.prefixlen}
class VppIp4Prefix(VppIp6Prefix):
pass
class VppIpMPrefix():
def __init__(self, saddr, gaddr, len):
self.saddr = saddr

View File

@ -2821,7 +2821,8 @@ class VppPapiProvider(object):
def vxlan_gbp_tunnel_dump(self, sw_if_index=0xffffffff):
return self.api(self.papi.vxlan_gbp_tunnel_dump,
{'sw_if_index': sw_if_index})
{'sw_if_index': sw_if_index,
'_no_type_conversion': True})
def pppoe_add_del_session(
self,
@ -3553,7 +3554,8 @@ class VppPapiProvider(object):
def gbp_endpoint_dump(self):
""" GBP endpoint Dump """
return self.api(self.papi.gbp_endpoint_dump, {})
return self.api(self.papi.gbp_endpoint_dump,
{'_no_type_conversion': True})
def gbp_endpoint_group_add(self, epg, bd,
rd, uplink_sw_if_index):
@ -3665,7 +3667,8 @@ class VppPapiProvider(object):
def gbp_subnet_dump(self):
""" GBP Subnet Dump """
return self.api(self.papi.gbp_subnet_dump, {})
return self.api(self.papi.gbp_subnet_dump,
{'_no_type_conversion': True})
def gbp_contract_add_del(self, is_add, src_epg, dst_epg, acl_index, rules):
""" GBP contract Add/Del """

View File

@ -5,15 +5,14 @@
from vpp_object import *
from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
from vpp_ip import *
def find_udp_encap(test, ue):
encaps = test.vapi.udp_encap_dump()
for e in encaps:
if ue.id == e.udp_encap.id \
and ue.src_ip == e.udp_encap.src_ip \
and ue.dst_ip == e.udp_encap.dst_ip \
and ue.src_ip == str(e.udp_encap.src_ip) \
and ue.dst_ip == str(e.udp_encap.dst_ip) \
and e.udp_encap.dst_port == ue.dst_port \
and e.udp_encap.src_port == ue.src_port:
return True
@ -34,15 +33,15 @@ class VppUdpEncap(VppObject):
self.table_id = table_id
self.src_ip_s = src_ip
self.dst_ip_s = dst_ip
self.src_ip = VppIpAddress(src_ip)
self.dst_ip = VppIpAddress(dst_ip)
self.src_ip = src_ip
self.dst_ip = dst_ip
self.src_port = src_port
self.dst_port = dst_port
def add_vpp_config(self):
r = self._test.vapi.udp_encap_add(
self.src_ip.encode(),
self.dst_ip.encode(),
self.src_ip,
self.dst_ip,
self.src_port,
self.dst_port,
self.table_id)