vpp/test/vpp_qemu_utils.py
Ivan Ivanets 9765e27635 tests: vpp_qemu_utils with concurrency handling
Type: test

Enhance vpp_qemu_utils functions with mutex locking, handle namespace
and
host interface existence, unique namespace/interface name, error
handling and retries, check error code explicitly.

Change-Id: I1ea66eeefbc1fee9b58e8b9886f4dd6fd8d33444
Signed-off-by: Ivan Ivanets <iivanets@cisco.com>
2024-11-19 16:52:48 +00:00

399 lines
13 KiB
Python

#!/usr/bin/env python
# Utility functions for QEMU tests ##
import subprocess
import sys
import os
import time
import random
import string
from multiprocessing import Lock, Process
lock = Lock()
def can_create_namespaces(namespace="vpp_chk_4212"):
"""Check if the environment allows creating the namespaces"""
with lock:
try:
result = subprocess.run(
["ip", "netns", "add", namespace], capture_output=True
)
if result.returncode != 0:
return False
subprocess.run(["ip", "netns", "del", namespace], capture_output=True)
return True
except Exception:
return False
def create_namespace(history_file, ns=None):
"""Create one or more namespaces."""
with lock:
namespaces = []
retries = 5
if ns is None:
result = None
for retry in range(retries):
suffix = "".join(
random.choices(string.ascii_lowercase + string.digits, k=8)
)
new_namespace_name = f"vpp_ns{suffix}"
# Check if the namespace already exists
result = subprocess.run(
["ip", "netns", "add", new_namespace_name],
capture_output=True,
text=True,
)
if result.returncode == 0:
with open(history_file, "a") as ns_file:
ns_file.write(f"{new_namespace_name}\n")
return new_namespace_name
error_message = result.stderr if result else "Unknown error"
raise Exception(
f"Failed to generate a unique namespace name after {retries} attempts."
f"Error from last attempt: {error_message}"
)
elif isinstance(ns, str):
namespaces = [ns]
else:
namespaces = ns
for namespace in namespaces:
for attempt in range(retries):
result = subprocess.run(
["ip", "netns", "add", namespace],
capture_output=True,
text=True,
)
if result.returncode == 0:
with open(history_file, "a") as ns_file:
ns_file.write(f"{namespace}\n")
break
if attempt >= retries - 1:
raise Exception(
f"Failed to create namespace {namespace} after {retries} attempts. Error: {result.stderr.decode()}"
)
return ns
def add_namespace_route(ns, prefix, gw_ip):
"""Add a route to a namespace.
arguments:
ns -- namespace string value
prefix -- NETWORK/MASK or "default"
gw_ip -- Gateway IP
"""
with lock:
try:
subprocess.run(
["ip", "netns", "exec", ns, "ip", "route", "add", prefix, "via", gw_ip],
capture_output=True,
)
except subprocess.CalledProcessError as e:
raise Exception("Error adding route to namespace:", e.output)
def delete_all_host_interfaces(history_file):
"""Delete all host interfaces whose names have been added to the history file."""
with lock:
if os.path.exists(history_file):
with open(history_file, "r") as if_file:
for line in if_file:
if_name = line.strip()
if if_name:
_delete_host_interfaces(if_name)
os.remove(history_file)
def _delete_host_interfaces(*host_interface_names):
"""Delete host interfaces.
arguments:
host_interface_names - sequence of host interface names to be deleted
"""
for host_interface_name in host_interface_names:
retries = 3
for attempt in range(retries):
check_result = subprocess.run(
["ip", "link", "show", host_interface_name],
capture_output=True,
text=True,
)
if check_result.returncode != 0:
break
result = subprocess.run(
["ip", "link", "del", host_interface_name],
capture_output=True,
text=True,
)
if result.returncode == 0:
break
if attempt < retries - 1:
time.sleep(1)
else:
raise Exception(
f"Failed to delete host interface {host_interface_name} after {retries} attempts"
)
def create_host_interface(
history_file, host_namespace, *host_ip_prefixes, vpp_if_name=None, host_if_name=None
):
"""Create a host interface of type veth.
arguments:
host_namespace -- host namespace into which the host_interface needs to be set
host_ip_prefixes -- a sequence of ip/prefix-lengths to be set
on the host_interface
vpp_if_name -- name of the veth interface on the VPP side
host_if_name -- name of the veth interface on the host side
"""
with lock:
retries = 5
for attempt in range(retries):
if_name = (
host_if_name
or f"hostif{''.join(random.choices(string.ascii_lowercase + string.digits, k=8))}"
)
new_vpp_if_name = (
vpp_if_name
or f"vppout{''.join(random.choices(string.ascii_lowercase + string.digits, k=8))}"
)
result = subprocess.run(
[
"ip",
"link",
"add",
"name",
new_vpp_if_name,
"type",
"veth",
"peer",
"name",
if_name,
],
capture_output=True,
)
if result.returncode == 0:
host_if_name = if_name
vpp_if_name = new_vpp_if_name
with open(history_file, "a") as if_file:
if_file.write(f"{host_if_name}\n{vpp_if_name}\n")
break
if attempt >= retries - 1:
raise Exception(
f"Failed to create host interface {if_name} and vpp {new_vpp_if_name} after {retries} attempts. Error: {result.stderr.decode()}"
)
result = subprocess.run(
["ip", "link", "set", host_if_name, "netns", host_namespace],
capture_output=True,
)
if result.returncode != 0:
raise Exception(
f"Error setting host interface namespace: {result.stderr.decode()}"
)
result = subprocess.run(
["ip", "link", "set", "dev", vpp_if_name, "up"], capture_output=True
)
if result.returncode != 0:
raise Exception(
f"Error bringing up the host interface: {result.stderr.decode()}"
)
result = subprocess.run(
[
"ip",
"netns",
"exec",
host_namespace,
"ip",
"link",
"set",
"dev",
host_if_name,
"up",
],
capture_output=True,
)
if result.returncode != 0:
raise Exception(
f"Error bringing up the host interface in namespace: {result.stderr.decode()}"
)
for host_ip_prefix in host_ip_prefixes:
result = subprocess.run(
[
"ip",
"netns",
"exec",
host_namespace,
"ip",
"addr",
"add",
host_ip_prefix,
"dev",
host_if_name,
],
capture_output=True,
)
if result.returncode != 0:
raise Exception(
f"Error setting ip prefix on the host interface: {result.stderr.decode()}"
)
return host_if_name, vpp_if_name
def set_interface_mtu(namespace, interface, mtu, logger):
"""Set an MTU number on a linux device interface."""
args = ["ip", "link", "set", "mtu", str(mtu), "dev", interface]
if namespace:
args = ["ip", "netns", "exec", namespace] + args
with lock:
retries = 3
for attempt in range(retries):
result = subprocess.run(args, capture_output=True)
if result.returncode == 0:
break
if attempt < retries - 1:
time.sleep(1)
else:
raise Exception(
f"Failed to set MTU on interface {interface} in namespace {namespace} after {retries} attempts"
)
def enable_interface_gso(namespace, interface):
"""Enable GSO offload on a linux device interface."""
args = ["ethtool", "-K", interface, "rx", "on", "tx", "on"]
if namespace:
args = ["ip", "netns", "exec", namespace] + args
with lock:
result = subprocess.run(args, capture_output=True)
if result.returncode != 0:
raise Exception(
f"Error enabling GSO offload on interface {interface} in namespace {namespace}: {result.stderr.decode()}"
)
def disable_interface_gso(namespace, interface):
"""Disable GSO offload on a linux device interface."""
args = ["ethtool", "-K", interface, "rx", "off", "tx", "off"]
if namespace:
args = ["ip", "netns", "exec", namespace] + args
with lock:
result = subprocess.run(args, capture_output=True)
if result.returncode != 0:
raise Exception(
f"Error disabling GSO offload on interface {interface} in namespace {namespace}: {result.stderr.decode()}"
)
def delete_all_namespaces(history_file):
"""Delete all namespaces whose names have been added to the history file."""
with lock:
if os.path.exists(history_file):
with open(history_file, "r") as ns_file:
for line in ns_file:
ns_name = line.strip()
if ns_name:
_delete_namespace(ns_name)
os.remove(history_file)
def _delete_namespace(ns):
"""Delete one or more namespaces.
arguments:
ns -- a list of namespace names or namespace
"""
if isinstance(ns, str):
namespaces = [ns]
else:
namespaces = ns
existing_namespaces = subprocess.run(
["ip", "netns", "list"], capture_output=True, text=True
).stdout.splitlines()
existing_namespaces = {line.split()[0] for line in existing_namespaces}
for namespace in namespaces:
if namespace not in existing_namespaces:
continue
retries = 3
for attempt in range(retries):
result = subprocess.run(
["ip", "netns", "del", namespace], capture_output=True
)
if result.returncode == 0:
break
if attempt < retries - 1:
time.sleep(1)
else:
raise Exception(
f"Failed to delete namespace {namespace} after {retries} attempts"
)
def list_namespace(ns):
"""List the IP address of a namespace."""
with lock:
result = subprocess.run(
["ip", "netns", "exec", ns, "ip", "addr"], capture_output=True
)
if result.returncode != 0:
raise Exception(
f"Error listing IP addresses in namespace {ns}: {result.stderr.decode()}"
)
def libmemif_test_app(memif_sock_path, logger):
"""Build & run the libmemif test_app for memif interface testing."""
test_dir = os.path.dirname(os.path.realpath(__file__))
ws_root = os.path.dirname(test_dir)
libmemif_app = os.path.join(
ws_root, "extras", "libmemif", "build", "examples", "test_app"
)
def build_libmemif_app():
if not os.path.exists(libmemif_app):
logger.info(f"Building app:{libmemif_app} for memif interface testing")
libmemif_app_dir = os.path.join(ws_root, "extras", "libmemif", "build")
os.makedirs(libmemif_app_dir, exist_ok=True)
os.chdir(libmemif_app_dir)
subprocess.run(["cmake", ".."], check=True)
subprocess.run(["make"], check=True)
def start_libmemif_app():
"""Restart once if the initial run fails."""
max_tries = 2
run = 0
while run < max_tries:
result = subprocess.run(
[libmemif_app, "-b", "9216", "-s", memif_sock_path], capture_output=True
)
if result.returncode == 0:
break
logger.error(
f"Restarting libmemif app due to error: {result.stderr.decode()}"
)
run += 1
time.sleep(1)
build_libmemif_app()
process = Process(target=start_libmemif_app)
process.start()
return process