d9b0c6fbf7
Drop pycodestyle for code style checking in favor of black. Black is much faster, stable PEP8 compliant code style checker offering also automatic formatting. It aims to be very stable and produce smallest diffs. It's used by many small and big projects. Running checkstyle with black takes a few seconds with a terse output. Thus, test-checkstyle-diff is no longer necessary. Expand scope of checkstyle to all python files in the repo, replacing test-checkstyle with checkstyle-python. Also, fixstyle-python is now available for automatic style formatting. Note: python virtualenv has been consolidated in test/Makefile, test/requirements*.txt which will eventually be moved to a central location. This is required to simply the automated generation of docker executor images in the CI. Type: improvement Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8 Signed-off-by: Klement Sekera <klement.sekera@gmail.com> Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
233 lines
7.2 KiB
Python
233 lines
7.2 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import unittest
|
|
from framework import VppTestCase, VppTestRunner
|
|
from util import ppp
|
|
from scapy.packet import Raw
|
|
from scapy.layers.inet import IP, UDP
|
|
from syslog_rfc5424_parser import SyslogMessage, ParseError
|
|
from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
|
|
from vpp_papi import VppEnum
|
|
|
|
|
|
class TestSyslog(VppTestCase):
|
|
"""Syslog Protocol Test Cases"""
|
|
|
|
@property
|
|
def SYSLOG_SEVERITY(self):
|
|
return VppEnum.vl_api_syslog_severity_t
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super(TestSyslog, cls).setUpClass()
|
|
|
|
try:
|
|
(cls.pg0,) = cls.create_pg_interfaces(range(1))
|
|
cls.pg0.admin_up()
|
|
cls.pg0.config_ip4()
|
|
cls.pg0.resolve_arp()
|
|
|
|
except Exception:
|
|
super(TestSyslog, cls).tearDownClass()
|
|
raise
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
super(TestSyslog, cls).tearDownClass()
|
|
|
|
def syslog_generate(self, facility, severity, appname, msgid, sd=None, msg=None):
|
|
"""
|
|
Generate syslog message
|
|
|
|
:param facility: facility value
|
|
:param severity: severity level
|
|
:param appname: application name that originate message
|
|
:param msgid: message identifier
|
|
:param sd: structured data (optional)
|
|
:param msg: free-form message (optional)
|
|
"""
|
|
facility_str = [
|
|
"kernel",
|
|
"user-level",
|
|
"mail-system",
|
|
"system-daemons",
|
|
"security-authorization",
|
|
"syslogd",
|
|
"line-printer",
|
|
"network-news",
|
|
"uucp",
|
|
"clock-daemon",
|
|
"",
|
|
"ftp-daemon",
|
|
"ntp-subsystem",
|
|
"log-audit",
|
|
"log-alert",
|
|
"",
|
|
"local0",
|
|
"local1",
|
|
"local2",
|
|
"local3",
|
|
"local4",
|
|
"local5",
|
|
"local6",
|
|
"local7",
|
|
]
|
|
|
|
severity_str = [
|
|
"emergency",
|
|
"alert",
|
|
"critical",
|
|
"error",
|
|
"warning",
|
|
"notice",
|
|
"informational",
|
|
"debug",
|
|
]
|
|
|
|
cli_str = "test syslog %s %s %s %s" % (
|
|
facility_str[facility],
|
|
severity_str[severity],
|
|
appname,
|
|
msgid,
|
|
)
|
|
if sd is not None:
|
|
for sd_id, sd_params in sd.items():
|
|
cli_str += " sd-id %s" % (sd_id)
|
|
for name, value in sd_params.items():
|
|
cli_str += " sd-param %s %s" % (name, value)
|
|
if msg is not None:
|
|
cli_str += " %s" % (msg)
|
|
self.vapi.cli(cli_str)
|
|
|
|
def syslog_verify(
|
|
self, data, facility, severity, appname, msgid, sd=None, msg=None
|
|
):
|
|
"""
|
|
Verify syslog message
|
|
|
|
:param data: syslog message
|
|
:param facility: facility value
|
|
:param severity: severity level
|
|
:param appname: application name that originate message
|
|
:param msgid: message identifier
|
|
:param sd: structured data (optional)
|
|
:param msg: free-form message (optional)
|
|
"""
|
|
message = data.decode("utf-8")
|
|
if sd is None:
|
|
sd = {}
|
|
try:
|
|
message = SyslogMessage.parse(message)
|
|
except ParseError as e:
|
|
self.logger.error(e)
|
|
raise
|
|
else:
|
|
self.assertEqual(message.facility, facility)
|
|
self.assertEqual(message.severity, severity)
|
|
self.assertEqual(message.appname, appname)
|
|
self.assertEqual(message.msgid, msgid)
|
|
self.assertEqual(message.msg, msg)
|
|
self.assertEqual(message.sd, sd)
|
|
self.assertEqual(message.version, 1)
|
|
self.assertEqual(message.hostname, self.pg0.local_ip4)
|
|
|
|
def test_syslog(self):
|
|
"""Syslog Protocol test"""
|
|
self.vapi.syslog_set_sender(
|
|
src_address=self.pg0.local_ip4, collector_address=self.pg0.remote_ip4
|
|
)
|
|
config = self.vapi.syslog_get_sender()
|
|
self.assertEqual(str(config.collector_address), self.pg0.remote_ip4)
|
|
self.assertEqual(config.collector_port, 514)
|
|
self.assertEqual(str(config.src_address), self.pg0.local_ip4)
|
|
self.assertEqual(config.vrf_id, 0)
|
|
self.assertEqual(config.max_msg_size, 480)
|
|
|
|
appname = "test"
|
|
msgid = "testMsg"
|
|
msg = "this is message"
|
|
sd1 = {
|
|
"exampleSDID@32473": {"iut": "3", "eventSource": "App", "eventID": "1011"}
|
|
}
|
|
sd2 = {
|
|
"exampleSDID@32473": {"iut": "3", "eventSource": "App", "eventID": "1011"},
|
|
"examplePriority@32473": {"class": "high"},
|
|
}
|
|
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.syslog_generate(
|
|
SyslogFacility.local7, SyslogSeverity.info, appname, msgid, None, msg
|
|
)
|
|
capture = self.pg0.get_capture(1)
|
|
try:
|
|
self.assertEqual(capture[0][IP].src, self.pg0.local_ip4)
|
|
self.assertEqual(capture[0][IP].dst, self.pg0.remote_ip4)
|
|
self.assertEqual(capture[0][UDP].dport, 514)
|
|
self.assert_packet_checksums_valid(capture[0], False)
|
|
except:
|
|
self.logger.error(ppp("invalid packet:", capture[0]))
|
|
raise
|
|
self.syslog_verify(
|
|
capture[0][Raw].load,
|
|
SyslogFacility.local7,
|
|
SyslogSeverity.info,
|
|
appname,
|
|
msgid,
|
|
None,
|
|
msg,
|
|
)
|
|
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_WARN)
|
|
filter = self.vapi.syslog_get_filter()
|
|
self.assertEqual(filter.severity, self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_WARN)
|
|
self.syslog_generate(
|
|
SyslogFacility.local7, SyslogSeverity.info, appname, msgid, None, msg
|
|
)
|
|
self.pg0.assert_nothing_captured()
|
|
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.syslog_generate(
|
|
SyslogFacility.local6, SyslogSeverity.warning, appname, msgid, sd1, msg
|
|
)
|
|
capture = self.pg0.get_capture(1)
|
|
self.syslog_verify(
|
|
capture[0][Raw].load,
|
|
SyslogFacility.local6,
|
|
SyslogSeverity.warning,
|
|
appname,
|
|
msgid,
|
|
sd1,
|
|
msg,
|
|
)
|
|
|
|
self.vapi.syslog_set_sender(
|
|
self.pg0.local_ip4, self.pg0.remote_ip4, collector_port=12345
|
|
)
|
|
config = self.vapi.syslog_get_sender()
|
|
self.assertEqual(config.collector_port, 12345)
|
|
|
|
self.pg_enable_capture(self.pg_interfaces)
|
|
self.syslog_generate(
|
|
SyslogFacility.local5, SyslogSeverity.err, appname, msgid, sd2, None
|
|
)
|
|
capture = self.pg0.get_capture(1)
|
|
try:
|
|
self.assertEqual(capture[0][UDP].dport, 12345)
|
|
except:
|
|
self.logger.error(ppp("invalid packet:", capture[0]))
|
|
raise
|
|
self.syslog_verify(
|
|
capture[0][Raw].load,
|
|
SyslogFacility.local5,
|
|
SyslogSeverity.err,
|
|
appname,
|
|
msgid,
|
|
sd2,
|
|
None,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main(testRunner=VppTestRunner)
|