PAPI: Unpack embedded types with variable length arrays.
Change-Id: Ic952ed5b837ac8409fd95e2b5cb92eb028ba0c40 Signed-off-by: Ole Troan <ot@cisco.com>
This commit is contained in:
@ -16,8 +16,8 @@ class TestAddType(unittest.TestCase):
|
||||
['u32', 'is_int']])
|
||||
|
||||
b = un.pack({'is_int': 0x12345678})
|
||||
self.assertEqual(len(b), 4)
|
||||
nt = un.unpack(b)
|
||||
nt, size = un.unpack(b)
|
||||
self.assertEqual(len(b), size)
|
||||
self.assertEqual(nt.is_bool, 0x12)
|
||||
self.assertEqual(nt.is_int, 0x12345678)
|
||||
|
||||
@ -31,12 +31,23 @@ class TestAddType(unittest.TestCase):
|
||||
[["vl_api_ip4_address_t", "ip4"],
|
||||
["vl_api_ip6_address_t", "ip6"]])
|
||||
|
||||
address = VPPType('address', [['vl_api_address_family_t', 'af'],
|
||||
['vl_api_address_union_t', 'un']])
|
||||
address = VPPType('vl_api_address_t',
|
||||
[['vl_api_address_family_t', 'af'],
|
||||
['vl_api_address_union_t', 'un']])
|
||||
|
||||
va_address_list = VPPType('list_addresses',
|
||||
[['u8', 'count'],
|
||||
['vl_api_address_t', 'addresses',
|
||||
0, 'count']])
|
||||
|
||||
message_with_va_address_list = VPPType('msg_with_vla',
|
||||
[['list_addresses',
|
||||
'vla_address'],
|
||||
['u8', 'is_cool']])
|
||||
|
||||
b = ip4.pack({'address': inet_pton(AF_INET, '1.1.1.1')})
|
||||
self.assertEqual(len(b), 4)
|
||||
nt = ip4.unpack(b)
|
||||
nt, size = ip4.unpack(b)
|
||||
self.assertEqual(nt.address, inet_pton(AF_INET, '1.1.1.1'))
|
||||
|
||||
b = ip6.pack({'address': inet_pton(AF_INET6, '1::1')})
|
||||
@ -48,19 +59,45 @@ class TestAddType(unittest.TestCase):
|
||||
{'address': inet_pton(AF_INET, '2.2.2.2')}}})
|
||||
self.assertEqual(len(b), 20)
|
||||
|
||||
nt = address.unpack(b)
|
||||
nt, size = address.unpack(b)
|
||||
self.assertEqual(nt.af, af.ADDRESS_IP4)
|
||||
self.assertEqual(nt.un.ip4.address,
|
||||
inet_pton(AF_INET, '2.2.2.2'))
|
||||
self.assertEqual(nt.un.ip6.address,
|
||||
inet_pton(AF_INET6, '0202:0202::'))
|
||||
|
||||
# List of addresses
|
||||
address_list = []
|
||||
for i in range(4):
|
||||
address_list.append({'af': af.ADDRESS_IP4,
|
||||
'un':
|
||||
{'ip4':
|
||||
{'address': inet_pton(AF_INET, '2.2.2.2')}}})
|
||||
b = va_address_list.pack({'count': len(address_list),
|
||||
'addresses': address_list})
|
||||
self.assertEqual(len(b), 81)
|
||||
|
||||
nt, size = va_address_list.unpack(b)
|
||||
self.assertEqual(nt.addresses[0].un.ip4.address,
|
||||
inet_pton(AF_INET, '2.2.2.2'))
|
||||
|
||||
b = message_with_va_address_list.pack({'vla_address':
|
||||
{'count': len(address_list),
|
||||
'addresses': address_list},
|
||||
'is_cool': 100})
|
||||
self.assertEqual(len(b), 82)
|
||||
nt, size = message_with_va_address_list.unpack(b)
|
||||
self.assertEqual(nt.is_cool, 100)
|
||||
|
||||
def test_arrays(self):
|
||||
# Test cases
|
||||
# 1. Fixed list
|
||||
# 2. Fixed list of variable length sub type
|
||||
# 3. Variable length type
|
||||
#
|
||||
s = VPPType('str', [['u32', 'length'],
|
||||
['u8', 'string', 0, 'length']])
|
||||
|
||||
ip4 = VPPType('ip4_address', [['u8', 'address', 4]])
|
||||
listip4 = VPPType('list_ip4_t', [['ip4_address', 'addresses', 4]])
|
||||
valistip4 = VPPType('list_ip4_t',
|
||||
@ -76,26 +113,39 @@ class TestAddType(unittest.TestCase):
|
||||
addresses.append({'address': inet_pton(AF_INET, '2.2.2.2')})
|
||||
b = listip4.pack({'addresses': addresses})
|
||||
self.assertEqual(len(b), 16)
|
||||
nt = listip4.unpack(b)
|
||||
|
||||
nt, size = listip4.unpack(b)
|
||||
self.assertEqual(nt.addresses[0].address,
|
||||
inet_pton(AF_INET, '2.2.2.2'))
|
||||
|
||||
b = valistip4.pack({'count': len(addresses), 'addresses': addresses})
|
||||
self.assertEqual(len(b), 17)
|
||||
|
||||
nt = valistip4.unpack(b)
|
||||
nt, size = valistip4.unpack(b)
|
||||
self.assertEqual(nt.count, 4)
|
||||
self.assertEqual(nt.addresses[0].address,
|
||||
inet_pton(AF_INET, '2.2.2.2'))
|
||||
|
||||
b = valistip4_legacy.pack({'foo': 1, 'addresses': addresses})
|
||||
self.assertEqual(len(b), 17)
|
||||
nt = valistip4_legacy.unpack(b)
|
||||
nt, size = valistip4_legacy.unpack(b)
|
||||
self.assertEqual(len(nt.addresses), 4)
|
||||
self.assertEqual(nt.addresses[0].address,
|
||||
inet_pton(AF_INET, '2.2.2.2'))
|
||||
|
||||
string = 'foobar foobar'
|
||||
b = s.pack({'length': len(string), 'string': string})
|
||||
nt, size = s.unpack(b)
|
||||
self.assertEqual(len(b), size)
|
||||
|
||||
def test_string(self):
|
||||
s = VPPType('str', [['u32', 'length'],
|
||||
['u8', 'string', 0, 'length']])
|
||||
|
||||
string = ''
|
||||
b = s.pack({'length': len(string), 'string': string})
|
||||
nt, size = s.unpack(b)
|
||||
self.assertEqual(len(b), size)
|
||||
|
||||
def test_message(self):
|
||||
foo = VPPMessage('foo', [['u16', '_vl_msg_id'],
|
||||
['u8', 'client_index'],
|
||||
@ -103,8 +153,8 @@ class TestAddType(unittest.TestCase):
|
||||
{"crc": "0x559b9f3c"}])
|
||||
b = foo.pack({'_vl_msg_id': 1, 'client_index': 5,
|
||||
'something': 200})
|
||||
self.assertEqual(len(b), 4)
|
||||
nt = foo.unpack(b)
|
||||
nt, size = foo.unpack(b)
|
||||
self.assertEqual(len(b), size)
|
||||
self.assertEqual(nt.something, 200)
|
||||
|
||||
def test_abf(self):
|
||||
@ -189,7 +239,7 @@ class TestAddType(unittest.TestCase):
|
||||
'_vl_msg_id': 1066,
|
||||
'policy': policy})
|
||||
|
||||
nt = abf_policy_add_del.unpack(b)
|
||||
nt, size = abf_policy_add_del.unpack(b)
|
||||
self.assertEqual(nt.policy.paths[0].next_hop,
|
||||
b'\x10\x02\x02\xac\x00\x00\x00\x00'
|
||||
b'\x00\x00\x00\x00\x00\x00\x00\x00')
|
||||
|
@ -523,8 +523,7 @@ class VPP():
|
||||
if not msg:
|
||||
self.logger.warning('vpp_api.read failed')
|
||||
return
|
||||
|
||||
i, ci = self.header.unpack(msg, 0)
|
||||
(i, ci), size = self.header.unpack(msg, 0)
|
||||
if self.id_names[i] == 'rx_thread_exit':
|
||||
return
|
||||
|
||||
@ -535,8 +534,7 @@ class VPP():
|
||||
if not msgobj:
|
||||
raise IOError(2, 'Reply message undefined')
|
||||
|
||||
r = msgobj.unpack(msg)
|
||||
|
||||
r, size = msgobj.unpack(msg)
|
||||
return r
|
||||
|
||||
def msg_handler_async(self, msg):
|
||||
|
@ -48,7 +48,7 @@ class BaseTypes():
|
||||
return self.packer.pack(data)
|
||||
|
||||
def unpack(self, data, offset, result=None):
|
||||
return self.packer.unpack_from(data, offset)[0]
|
||||
return self.packer.unpack_from(data, offset)[0], self.packer.size
|
||||
|
||||
|
||||
types = {}
|
||||
@ -102,15 +102,18 @@ class FixedList():
|
||||
def unpack(self, data, offset=0, result=None):
|
||||
# Return a list of arguments
|
||||
result = []
|
||||
total = 0
|
||||
for e in range(self.num):
|
||||
x = self.packer.unpack(data, offset)
|
||||
x, size = self.packer.unpack(data, offset)
|
||||
result.append(x)
|
||||
offset += self.packer.size
|
||||
return result
|
||||
offset += size
|
||||
total += size
|
||||
return result, total
|
||||
|
||||
|
||||
class VLAList():
|
||||
def __init__(self, name, field_type, len_field_name, index):
|
||||
self.name = name
|
||||
self.index = index
|
||||
self.packer = types[field_type]
|
||||
self.size = self.packer.size
|
||||
@ -132,21 +135,22 @@ class VLAList():
|
||||
|
||||
def unpack(self, data, offset=0, result=None):
|
||||
# Return a list of arguments
|
||||
total = 0
|
||||
|
||||
# u8 array
|
||||
if self.packer.size == 1:
|
||||
if result[self.index] == 0:
|
||||
return b''
|
||||
return b'', 0
|
||||
p = BaseTypes('u8', result[self.index])
|
||||
r = p.unpack(data, offset)
|
||||
return r
|
||||
return p.unpack(data, offset)
|
||||
|
||||
r = []
|
||||
for e in range(result[self.index]):
|
||||
x = self.packer.unpack(data, offset)
|
||||
x, size = self.packer.unpack(data, offset)
|
||||
r.append(x)
|
||||
offset += self.packer.size
|
||||
return r
|
||||
offset += size
|
||||
total += size
|
||||
return r, total
|
||||
|
||||
|
||||
class VLAList_legacy():
|
||||
@ -164,16 +168,18 @@ class VLAList_legacy():
|
||||
return b
|
||||
|
||||
def unpack(self, data, offset=0, result=None):
|
||||
total = 0
|
||||
# Return a list of arguments
|
||||
if (len(data) - offset) % self.packer.size:
|
||||
raise ValueError('Legacy Variable Length Array length mismatch.')
|
||||
elements = int((len(data) - offset) / self.packer.size)
|
||||
r = []
|
||||
for e in range(elements):
|
||||
x = self.packer.unpack(data, offset)
|
||||
x, size = self.packer.unpack(data, offset)
|
||||
r.append(x)
|
||||
offset += self.packer.size
|
||||
return r
|
||||
total += size
|
||||
return r, total
|
||||
|
||||
|
||||
class VPPEnumType():
|
||||
@ -198,8 +204,8 @@ class VPPEnumType():
|
||||
return types['u32'].pack(data, kwargs)
|
||||
|
||||
def unpack(self, data, offset=0, result=None):
|
||||
x = types['u32'].unpack(data, offset)
|
||||
return self.enum(x)
|
||||
x, size = types['u32'].unpack(data, offset)
|
||||
return self.enum(x), size
|
||||
|
||||
|
||||
class VPPUnionType():
|
||||
@ -239,9 +245,13 @@ class VPPUnionType():
|
||||
|
||||
def unpack(self, data, offset=0, result=None):
|
||||
r = []
|
||||
maxsize = 0
|
||||
for k, p in self.packers.items():
|
||||
r.append(p.unpack(data, offset))
|
||||
return self.tuple._make(r)
|
||||
x, size = p.unpack(data, offset)
|
||||
if size > maxsize:
|
||||
maxsize = size
|
||||
r.append(x)
|
||||
return self.tuple._make(r), maxsize
|
||||
|
||||
|
||||
class VPPType():
|
||||
@ -310,13 +320,16 @@ class VPPType():
|
||||
def unpack(self, data, offset=0, result=None):
|
||||
# Return a list of arguments
|
||||
result = []
|
||||
total = 0
|
||||
for p in self.packers:
|
||||
x = p.unpack(data, offset, result)
|
||||
x, size = p.unpack(data, offset, result)
|
||||
if type(x) is tuple and len(x) == 1:
|
||||
x = x[0]
|
||||
result.append(x)
|
||||
offset += p.size
|
||||
return self.tuple._make(result)
|
||||
offset += size
|
||||
total += size
|
||||
t = self.tuple._make(result)
|
||||
return t, total
|
||||
|
||||
|
||||
class VPPMessage(VPPType):
|
||||
|
Reference in New Issue
Block a user