c0a2f0ec9b
Allow entering a test name on stack trace window with STEP=y option instead of a number. This allows to run a whole suite and skip all tests until a particular test is hit. Type: improvement Signed-off-by: Klement Sekera <ksekera@cisco.com> Change-Id: I23e45f8022b82545365b8921390e0e106e02b39c
253 lines
8.1 KiB
Python
253 lines
8.1 KiB
Python
import os
|
|
import sys
|
|
import traceback
|
|
import ipaddress
|
|
from subprocess import check_output, CalledProcessError
|
|
|
|
import scapy.compat
|
|
import framework
|
|
from log import RED, single_line_delim, double_line_delim
|
|
from util import check_core_path, get_core_path
|
|
|
|
|
|
class Hook:
|
|
"""
|
|
Generic hooks before/after API/CLI calls
|
|
"""
|
|
|
|
def __init__(self, test):
|
|
self.test = test
|
|
self.logger = test.logger
|
|
|
|
def before_api(self, api_name, api_args):
|
|
"""
|
|
Function called before API call
|
|
Emit a debug message describing the API name and arguments
|
|
|
|
@param api_name: name of the API
|
|
@param api_args: tuple containing the API arguments
|
|
"""
|
|
|
|
def _friendly_format(val):
|
|
if not isinstance(val, str):
|
|
return val
|
|
if len(val) == 6:
|
|
return '{!s} ({!s})'.format(val, ':'.join(['{:02x}'.format(
|
|
scapy.compat.orb(x)) for x in val]))
|
|
try:
|
|
# we don't call test_type(val) because it is a packed value.
|
|
return '{!s} ({!s})'.format(val, str(
|
|
ipaddress.ip_address(val)))
|
|
except ValueError:
|
|
return val
|
|
|
|
_args = ', '.join("{!s}={!r}".format(key, _friendly_format(val)) for
|
|
(key, val) in api_args.items())
|
|
self.logger.debug("API: %s (%s)" %
|
|
(api_name, _args), extra={'color': RED})
|
|
|
|
def after_api(self, api_name, api_args):
|
|
"""
|
|
Function called after API call
|
|
|
|
@param api_name: name of the API
|
|
@param api_args: tuple containing the API arguments
|
|
"""
|
|
pass
|
|
|
|
def before_cli(self, cli):
|
|
"""
|
|
Function called before CLI call
|
|
Emit a debug message describing the CLI
|
|
|
|
@param cli: CLI string
|
|
"""
|
|
self.logger.debug("CLI: %s" % (cli), extra={'color': RED})
|
|
|
|
def after_cli(self, cli):
|
|
"""
|
|
Function called after CLI call
|
|
"""
|
|
pass
|
|
|
|
|
|
class PollHook(Hook):
|
|
""" Hook which checks if the vpp subprocess is alive """
|
|
|
|
def __init__(self, test):
|
|
super(PollHook, self).__init__(test)
|
|
|
|
def on_crash(self, core_path):
|
|
self.logger.error("Core file present, debug with: gdb %s %s",
|
|
self.test.vpp_bin, core_path)
|
|
check_core_path(self.logger, core_path)
|
|
self.logger.error("Running `file %s':", core_path)
|
|
try:
|
|
info = check_output(["file", core_path])
|
|
self.logger.error(info)
|
|
except CalledProcessError as e:
|
|
self.logger.error(
|
|
"Subprocess returned with error running `file' utility on "
|
|
"core-file, "
|
|
"rc=%s", e.returncode)
|
|
except OSError as e:
|
|
self.logger.error(
|
|
"Subprocess returned OS error running `file' utility on "
|
|
"core-file, "
|
|
"oserror=(%s) %s", e.errno, e.strerror)
|
|
except Exception as e:
|
|
self.logger.error(
|
|
"Subprocess returned unanticipated error running `file' "
|
|
"utility on core-file, "
|
|
"%s", e)
|
|
|
|
def poll_vpp(self):
|
|
"""
|
|
Poll the vpp status and throw an exception if it's not running
|
|
:raises VppDiedError: exception if VPP is not running anymore
|
|
"""
|
|
if self.test.vpp_dead:
|
|
# already dead, nothing to do
|
|
return
|
|
|
|
self.test.vpp.poll()
|
|
if self.test.vpp.returncode is not None:
|
|
self.test.vpp_dead = True
|
|
raise framework.VppDiedError(rv=self.test.vpp.returncode)
|
|
core_path = get_core_path(self.test.tempdir)
|
|
if os.path.isfile(core_path):
|
|
self.on_crash(core_path)
|
|
|
|
def before_api(self, api_name, api_args):
|
|
"""
|
|
Check if VPP died before executing an API
|
|
|
|
:param api_name: name of the API
|
|
:param api_args: tuple containing the API arguments
|
|
:raises VppDiedError: exception if VPP is not running anymore
|
|
|
|
"""
|
|
super(PollHook, self).before_api(api_name, api_args)
|
|
self.poll_vpp()
|
|
|
|
def before_cli(self, cli):
|
|
"""
|
|
Check if VPP died before executing a CLI
|
|
|
|
:param cli: CLI string
|
|
:raises Exception: exception if VPP is not running anymore
|
|
|
|
"""
|
|
super(PollHook, self).before_cli(cli)
|
|
self.poll_vpp()
|
|
|
|
|
|
class StepHook(PollHook):
|
|
""" Hook which requires user to press ENTER before doing any API/CLI """
|
|
|
|
def __init__(self, test):
|
|
self.skip_stack = None
|
|
self.skip_num = None
|
|
self.skip_count = 0
|
|
self.break_func = None
|
|
super(StepHook, self).__init__(test)
|
|
|
|
def skip(self):
|
|
if self.break_func is not None:
|
|
return self.should_skip_func_based()
|
|
if self.skip_stack is not None:
|
|
return self.should_skip_stack_based()
|
|
|
|
def should_skip_func_based(self):
|
|
stack = traceback.extract_stack()
|
|
for e in stack:
|
|
if e[2] == self.break_func:
|
|
self.break_func = None
|
|
return False
|
|
return True
|
|
|
|
def should_skip_stack_based(self):
|
|
stack = traceback.extract_stack()
|
|
counter = 0
|
|
skip = True
|
|
for e in stack:
|
|
if counter > self.skip_num:
|
|
break
|
|
if e[0] != self.skip_stack[counter][0]:
|
|
skip = False
|
|
if e[1] != self.skip_stack[counter][1]:
|
|
skip = False
|
|
counter += 1
|
|
if skip:
|
|
self.skip_count += 1
|
|
return True
|
|
else:
|
|
print("%d API/CLI calls skipped in specified stack "
|
|
"frame" % self.skip_count)
|
|
self.skip_count = 0
|
|
self.skip_stack = None
|
|
self.skip_num = None
|
|
return False
|
|
|
|
def user_input(self):
|
|
print('number\tfunction\tfile\tcode')
|
|
counter = 0
|
|
stack = traceback.extract_stack()
|
|
for e in stack:
|
|
print('%02d.\t%s\t%s:%d\t[%s]' % (counter, e[2], e[0], e[1], e[3]))
|
|
counter += 1
|
|
print(single_line_delim)
|
|
print("You may enter a number of stack frame chosen from above")
|
|
print("Calls in/below that stack frame will be not be stepped anymore")
|
|
print("Alternatively, enter a test function name to stop at")
|
|
print(single_line_delim)
|
|
while True:
|
|
print("Enter your choice, if any, and press ENTER to continue "
|
|
"running the testcase...")
|
|
choice = sys.stdin.readline().rstrip('\r\n')
|
|
if choice == "":
|
|
choice = None
|
|
try:
|
|
if choice is not None:
|
|
num = int(choice)
|
|
except ValueError:
|
|
if choice.startswith("test_"):
|
|
break
|
|
print("Invalid input")
|
|
continue
|
|
if choice is not None and (num < 0 or num >= len(stack)):
|
|
print("Invalid choice")
|
|
continue
|
|
break
|
|
if choice is not None:
|
|
if choice.startswith("test_"):
|
|
self.break_func = choice
|
|
else:
|
|
self.break_func = None
|
|
self.skip_stack = stack
|
|
self.skip_num = num
|
|
|
|
def before_cli(self, cli):
|
|
""" Wait for ENTER before executing CLI """
|
|
if self.skip():
|
|
print("Skip pause before executing CLI: %s" % cli)
|
|
else:
|
|
print(double_line_delim)
|
|
print("Test paused before executing CLI: %s" % cli)
|
|
print(single_line_delim)
|
|
self.user_input()
|
|
super(StepHook, self).before_cli(cli)
|
|
|
|
def before_api(self, api_name, api_args):
|
|
""" Wait for ENTER before executing API """
|
|
if self.skip():
|
|
print("Skip pause before executing API: %s (%s)"
|
|
% (api_name, api_args))
|
|
else:
|
|
print(double_line_delim)
|
|
print("Test paused before executing API: %s (%s)"
|
|
% (api_name, api_args))
|
|
print(single_line_delim)
|
|
self.user_input()
|
|
super(StepHook, self).before_api(api_name, api_args)
|