tests: cpus awareness

Introduce MAX_CPUS parameters to control maximum number of CPUs used by
VPP(s) during testing, with default value 'auto' corresponding to all
CPUs available.

Calculate test CPU requirements by taking into account the number of
workers, so a test requires 1 (main thread) + # of worker CPUs.

When running tests, keep track of both running test jobs (controlled by
TEST_JOBS parameter) and free CPUs. This then causes two limits in the
system - to not exceed number of jobs in parallel but also to not exceed
number of CPUs available.

Skip tests which require more CPUs than are available in system (or more
than MAX_CPUS) and print a warning message.

Type: improvement
Change-Id: Ib8fda54e4c6a36179d64160bb87fbd3a0011762d
Signed-off-by: Klement Sekera <ksekera@cisco.com>
This commit is contained in:
Klement Sekera
2021-04-08 19:37:41 +02:00
committed by Andrew Yourtchenko
parent f70cf23376
commit 558ceabc6c
8 changed files with 294 additions and 158 deletions

View File

@ -22,6 +22,7 @@ from inspect import getdoc, isclass
from traceback import format_exception
from logging import FileHandler, DEBUG, Formatter
from enum import Enum
from abc import ABC, abstractmethod
import scapy.compat
from scapy.packet import Raw
@ -42,6 +43,8 @@ from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.inet6 import ICMPv6DestUnreach, ICMPv6EchoRequest
from scapy.layers.inet6 import ICMPv6EchoReply
from cpu_config import available_cpus, num_cpus, max_vpp_cpus
logger = logging.getLogger(__name__)
# Set up an empty logger for the testcase that can be overridden as necessary
@ -53,6 +56,7 @@ FAIL = 1
ERROR = 2
SKIP = 3
TEST_RUN = 4
SKIP_CPU_SHORTAGE = 5
class BoolEnvironmentVariable(object):
@ -223,6 +227,21 @@ def _running_gcov_tests():
running_gcov_tests = _running_gcov_tests()
def get_environ_vpp_worker_count():
worker_config = os.getenv("VPP_WORKER_CONFIG", None)
if worker_config:
elems = worker_config.split(" ")
if elems[0] != "workers" or len(elems) != 2:
raise ValueError("Wrong VPP_WORKER_CONFIG == '%s' value." %
worker_config)
return int(elems[1])
else:
return 0
environ_vpp_worker_count = get_environ_vpp_worker_count()
class KeepAliveReporter(object):
"""
Singleton object which reports test start to parent process
@ -292,7 +311,21 @@ class DummyVpp:
pass
class VppTestCase(unittest.TestCase):
class CPUInterface(ABC):
cpus = []
skipped_due_to_cpu_lack = False
@classmethod
@abstractmethod
def get_cpus_required(cls):
pass
@classmethod
def assign_cpus(cls, cpus):
cls.cpus = cpus
class VppTestCase(CPUInterface, unittest.TestCase):
"""This subclass is a base class for VPP test cases that are implemented as
classes. It provides methods to create and run test case.
"""
@ -358,34 +391,18 @@ class VppTestCase(unittest.TestCase):
if dl == "gdb-all" or dl == "gdbserver-all":
cls.debug_all = True
@staticmethod
def get_least_used_cpu():
cpu_usage_list = [set(range(psutil.cpu_count()))]
vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name'])
if 'vpp_main' == p.info['name']]
for vpp_process in vpp_processes:
for cpu_usage_set in cpu_usage_list:
try:
cpu_num = vpp_process.cpu_num()
if cpu_num in cpu_usage_set:
cpu_usage_set_index = cpu_usage_list.index(
cpu_usage_set)
if cpu_usage_set_index == len(cpu_usage_list) - 1:
cpu_usage_list.append({cpu_num})
else:
cpu_usage_list[cpu_usage_set_index + 1].add(
cpu_num)
cpu_usage_set.remove(cpu_num)
break
except psutil.NoSuchProcess:
pass
@classmethod
def get_vpp_worker_count(cls):
if not hasattr(cls, "vpp_worker_count"):
if cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
cls.vpp_worker_count = 0
else:
cls.vpp_worker_count = environ_vpp_worker_count
return cls.vpp_worker_count
for cpu_usage_set in cpu_usage_list:
if len(cpu_usage_set) > 0:
min_usage_set = cpu_usage_set
break
return random.choice(tuple(min_usage_set))
@classmethod
def get_cpus_required(cls):
return 1 + cls.get_vpp_worker_count()
@classmethod
def setUpConstants(cls):
@ -417,20 +434,6 @@ class VppTestCase(unittest.TestCase):
if coredump_size is None:
coredump_size = "coredump-size unlimited"
cpu_core_number = cls.get_least_used_cpu()
if not hasattr(cls, "vpp_worker_count"):
cls.vpp_worker_count = 0
worker_config = os.getenv("VPP_WORKER_CONFIG", "")
if worker_config:
elems = worker_config.split(" ")
if elems[0] != "workers" or len(elems) != 2:
raise ValueError("Wrong VPP_WORKER_CONFIG == '%s' value." %
worker_config)
cls.vpp_worker_count = int(elems[1])
if cls.vpp_worker_count > 0 and\
cls.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
cls.vpp_worker_count = 0
default_variant = os.getenv("VARIANT")
if default_variant is not None:
default_variant = "defaults { %s 100 }" % default_variant
@ -447,9 +450,10 @@ class VppTestCase(unittest.TestCase):
coredump_size, "runtime-dir", cls.tempdir, "}",
"api-trace", "{", "on", "}",
"api-segment", "{", "prefix", cls.get_api_segment_prefix(), "}",
"cpu", "{", "main-core", str(cpu_core_number), ]
if cls.vpp_worker_count:
cls.vpp_cmdline.extend(["workers", str(cls.vpp_worker_count)])
"cpu", "{", "main-core", str(cls.cpus[0]), ]
if cls.get_vpp_worker_count():
cls.vpp_cmdline.extend([
"corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])])
cls.vpp_cmdline.extend([
"}",
"physmem", "{", "max-size", "32m", "}",
@ -509,11 +513,12 @@ class VppTestCase(unittest.TestCase):
@classmethod
def run_vpp(cls):
cls.logger.debug(f"Assigned cpus: {cls.cpus}")
cmdline = cls.vpp_cmdline
if cls.debug_gdbserver:
gdbserver = '/usr/bin/gdbserver'
if not os.path.isfile(gdbserver) or \
if not os.path.isfile(gdbserver) or\
not os.access(gdbserver, os.X_OK):
raise Exception("gdbserver binary '%s' does not exist or is "
"not executable" % gdbserver)
@ -1349,6 +1354,7 @@ class VppTestResult(unittest.TestResult):
self.verbosity = verbosity
self.result_string = None
self.runner = runner
self.printed = []
def addSuccess(self, test):
"""
@ -1383,7 +1389,10 @@ class VppTestResult(unittest.TestResult):
unittest.TestResult.addSkip(self, test, reason)
self.result_string = colorize("SKIP", YELLOW)
self.send_result_through_pipe(test, SKIP)
if reason == "not enough cpus":
self.send_result_through_pipe(test, SKIP_CPU_SHORTAGE)
else:
self.send_result_through_pipe(test, SKIP)
def symlink_failed(self):
if self.current_test_case_info:
@ -1501,28 +1510,34 @@ class VppTestResult(unittest.TestResult):
"""
def print_header(test):
if test.__class__ in self.printed:
return
test_doc = getdoc(test)
if not test_doc:
raise Exception("No doc string for test '%s'" % test.id())
test_title = test_doc.splitlines()[0]
test_title_colored = colorize(test_title, GREEN)
test_title = colorize(test_title, GREEN)
if test.is_tagged_run_solo():
# long live PEP-8 and 80 char width limitation...
c = YELLOW
test_title_colored = colorize("SOLO RUN: " + test_title, c)
test_title = colorize(f"SOLO RUN: {test_title}", YELLOW)
# This block may overwrite the colorized title above,
# but we want this to stand out and be fixed
if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
c = RED
w = "FIXME with VPP workers: "
test_title_colored = colorize(w + test_title, c)
test_title = colorize(
f"FIXME with VPP workers: {test_title}", RED)
if not hasattr(test.__class__, '_header_printed'):
print(double_line_delim)
print(test_title_colored)
print(double_line_delim)
test.__class__._header_printed = True
if test.__class__.skipped_due_to_cpu_lack:
test_title = colorize(
f"{test_title} [skipped - not enough cpus, "
f"required={test.__class__.get_cpus_required()}, "
f"available={max_vpp_cpus}]", YELLOW)
print(double_line_delim)
print(test_title)
print(double_line_delim)
self.printed.append(test.__class__)
print_header(test)
self.start_test = time.time()