tests: move test source to vpp/test
- Generate copyright year and version instead of using hard-coded data Type: refactor Signed-off-by: Dave Wallace <dwallacelf@gmail.com> Change-Id: I6058f5025323b3aa483f5df4a2c4371e27b5914e
This commit is contained in:
committed by
Damjan Marion
parent
fd77f8c00c
commit
eddd8e3588
@@ -0,0 +1,179 @@
|
||||
from ipaddress import IPv4Address, AddressValueError
|
||||
from vpp_object import VppObject
|
||||
from vpp_papi import VppEnum
|
||||
|
||||
|
||||
class AuthMethod:
|
||||
v = {'rsa-sig': 1,
|
||||
'shared-key': 2}
|
||||
|
||||
@staticmethod
|
||||
def value(key): return AuthMethod.v[key]
|
||||
|
||||
|
||||
class IDType:
|
||||
v = {'ip4-addr': 1,
|
||||
'fqdn': 2,
|
||||
'ip6-addr': 5}
|
||||
|
||||
@staticmethod
|
||||
def value(key): return IDType.v[key]
|
||||
|
||||
|
||||
class Profile(VppObject):
|
||||
""" IKEv2 profile """
|
||||
def __init__(self, test, profile_name):
|
||||
self.test = test
|
||||
self.vapi = test.vapi
|
||||
self.profile_name = profile_name
|
||||
self.udp_encap = False
|
||||
self.natt = True
|
||||
|
||||
def disable_natt(self):
|
||||
self.natt = False
|
||||
|
||||
def add_auth(self, method, data, is_hex=False):
|
||||
if isinstance(method, int):
|
||||
m = method
|
||||
elif isinstance(method, str):
|
||||
m = AuthMethod.value(method)
|
||||
else:
|
||||
raise Exception('unsupported type {}'.format(method))
|
||||
self.auth = {'auth_method': m,
|
||||
'data': data,
|
||||
'is_hex': is_hex}
|
||||
|
||||
def add_local_id(self, id_type, data):
|
||||
if isinstance(id_type, str):
|
||||
t = IDType.value(id_type)
|
||||
self.local_id = {'id_type': t,
|
||||
'data': data,
|
||||
'is_local': True}
|
||||
|
||||
def add_remote_id(self, id_type, data):
|
||||
if isinstance(id_type, str):
|
||||
t = IDType.value(id_type)
|
||||
self.remote_id = {'id_type': t,
|
||||
'data': data,
|
||||
'is_local': False}
|
||||
|
||||
def add_local_ts(self, start_addr, end_addr, start_port=0, end_port=0xffff,
|
||||
proto=0, is_ip4=True):
|
||||
self.ts_is_ip4 = is_ip4
|
||||
self.local_ts = {'is_local': True,
|
||||
'protocol_id': proto,
|
||||
'start_port': start_port,
|
||||
'end_port': end_port,
|
||||
'start_addr': start_addr,
|
||||
'end_addr': end_addr}
|
||||
|
||||
def add_remote_ts(self, start_addr, end_addr, start_port=0,
|
||||
end_port=0xffff, proto=0):
|
||||
try:
|
||||
IPv4Address(start_addr)
|
||||
is_ip4 = True
|
||||
except AddressValueError:
|
||||
is_ip4 = False
|
||||
self.ts_is_ip4 = is_ip4
|
||||
self.remote_ts = {'is_local': False,
|
||||
'protocol_id': proto,
|
||||
'start_port': start_port,
|
||||
'end_port': end_port,
|
||||
'start_addr': start_addr,
|
||||
'end_addr': end_addr}
|
||||
|
||||
def add_responder_hostname(self, hn):
|
||||
self.responder_hostname = hn
|
||||
|
||||
def add_responder(self, responder):
|
||||
self.responder = responder
|
||||
|
||||
def add_ike_transforms(self, tr):
|
||||
self.ike_transforms = tr
|
||||
|
||||
def add_esp_transforms(self, tr):
|
||||
self.esp_transforms = tr
|
||||
|
||||
def set_udp_encap(self, udp_encap):
|
||||
self.udp_encap = udp_encap
|
||||
|
||||
def set_lifetime_data(self, data):
|
||||
self.lifetime_data = data
|
||||
|
||||
def set_ipsec_over_udp_port(self, port):
|
||||
self.ipsec_udp_port = {'is_set': 1,
|
||||
'port': port}
|
||||
|
||||
def set_tunnel_interface(self, sw_if_index):
|
||||
self.tun_itf = sw_if_index
|
||||
|
||||
def object_id(self):
|
||||
return 'ikev2-profile-%s' % self.profile_name
|
||||
|
||||
def remove_vpp_config(self):
|
||||
self.vapi.ikev2_profile_add_del(name=self.profile_name, is_add=False)
|
||||
|
||||
def add_vpp_config(self):
|
||||
self.vapi.ikev2_profile_add_del(name=self.profile_name, is_add=True)
|
||||
if hasattr(self, 'auth'):
|
||||
self.vapi.ikev2_profile_set_auth(name=self.profile_name,
|
||||
data_len=len(self.auth['data']),
|
||||
**self.auth)
|
||||
if hasattr(self, 'local_id'):
|
||||
self.vapi.ikev2_profile_set_id(name=self.profile_name,
|
||||
data_len=len(self.local_id
|
||||
['data']),
|
||||
**self.local_id)
|
||||
if hasattr(self, 'remote_id'):
|
||||
self.vapi.ikev2_profile_set_id(name=self.profile_name,
|
||||
data_len=len(self.remote_id
|
||||
['data']),
|
||||
**self.remote_id)
|
||||
if hasattr(self, 'local_ts'):
|
||||
self.vapi.ikev2_profile_set_ts(name=self.profile_name,
|
||||
ts=self.local_ts)
|
||||
|
||||
if hasattr(self, 'remote_ts'):
|
||||
self.vapi.ikev2_profile_set_ts(name=self.profile_name,
|
||||
ts=self.remote_ts)
|
||||
|
||||
if hasattr(self, 'responder'):
|
||||
self.vapi.ikev2_set_responder(name=self.profile_name,
|
||||
responder=self.responder)
|
||||
|
||||
if hasattr(self, 'responder_hostname'):
|
||||
print(self.responder_hostname)
|
||||
self.vapi.ikev2_set_responder_hostname(name=self.profile_name,
|
||||
**self.responder_hostname)
|
||||
|
||||
if hasattr(self, 'ike_transforms'):
|
||||
self.vapi.ikev2_set_ike_transforms(name=self.profile_name,
|
||||
tr=self.ike_transforms)
|
||||
|
||||
if hasattr(self, 'esp_transforms'):
|
||||
self.vapi.ikev2_set_esp_transforms(name=self.profile_name,
|
||||
tr=self.esp_transforms)
|
||||
|
||||
if self.udp_encap:
|
||||
self.vapi.ikev2_profile_set_udp_encap(name=self.profile_name)
|
||||
|
||||
if hasattr(self, 'lifetime_data'):
|
||||
self.vapi.ikev2_set_sa_lifetime(name=self.profile_name,
|
||||
**self.lifetime_data)
|
||||
|
||||
if hasattr(self, 'ipsec_udp_port'):
|
||||
self.vapi.ikev2_profile_set_ipsec_udp_port(name=self.profile_name,
|
||||
**self.ipsec_udp_port)
|
||||
if hasattr(self, 'tun_itf'):
|
||||
self.vapi.ikev2_set_tunnel_interface(name=self.profile_name,
|
||||
sw_if_index=self.tun_itf)
|
||||
|
||||
if not self.natt:
|
||||
self.vapi.ikev2_profile_disable_natt(name=self.profile_name)
|
||||
|
||||
def query_vpp_config(self):
|
||||
res = self.vapi.ikev2_profile_dump()
|
||||
for r in res:
|
||||
if r.profile.name == self.profile_name:
|
||||
return r.profile
|
||||
return None
|
||||
Reference in New Issue
Block a user